dxf, server: tune DXF task scheduling controls#69597
Conversation
|
Skipping CI for Draft Pull Request. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a runtime-configurable DXF max-concurrent-task setting, updates scheduler/storage/benchmark paths to read it dynamically, exposes it over HTTP, rewrites task-history transfer to batch SQL, and refactors import cleanup into batch processing with parallel metering. ChangesDXF concurrency and task-history flow
Import cleanup batching and metering
Estimated code review effort: 4 (Complex) | ~60 minutes Suggested labels: Suggested reviewers: Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
pkg/dxf/framework/scheduler/scheduler_manager_nokit_test.go (1)
131-143: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winTest doesn't exercise the real batch-limit enforcement.
This scenario mocks
GetTasksInStatesto already return a capped slice (manyTasks[:maxCleanupTaskBatchSize]), so it only verifies thatdoCleanupTaskforwards whateverTaskManagerreturns — it does not validate that the real SQLlimit 100instorage/task_table.gois actually what produces that batch. Combined withmaxCleanupTaskBatchSizenot being referenced by production code (see comment onscheduler_manager.go), this test can pass even if the real storage-layer limit diverges from 100.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/dxf/framework/scheduler/scheduler_manager_nokit_test.go` around lines 131 - 143, The cleanup batching test is only asserting the mocked return value from GetTasksInStates, so it does not validate the real batch-limit behavior. Update doCleanupTask-related testing in scheduler_manager_nokit_test.go to use an over-limit task set and verify that the storage-facing path, not the mock slice, enforces the limit; reference doCleanupTask, GetTasksInStates, and TransferTasks2History when adjusting the expectations. If the production limit is defined in storage/task_table.go, align the test with that source of truth instead of maxCleanupTaskBatchSize so the test fails when the real batch size diverges.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/dxf/framework/scheduler/scheduler_manager.go`:
- Around line 54-55: The cleanup batch size is duplicated between
maxCleanupTaskBatchSize and the hardcoded limit in task_table.go, so there are
two independent sources of truth. Move the value into a shared constant that can
be referenced by both scheduler_manager_nokit_test.go and the cleanup logic in
task_table.go, then replace both literals with that shared symbol to keep the
limit consistent.
---
Nitpick comments:
In `@pkg/dxf/framework/scheduler/scheduler_manager_nokit_test.go`:
- Around line 131-143: The cleanup batching test is only asserting the mocked
return value from GetTasksInStates, so it does not validate the real batch-limit
behavior. Update doCleanupTask-related testing in
scheduler_manager_nokit_test.go to use an over-limit task set and verify that
the storage-facing path, not the mock slice, enforces the limit; reference
doCleanupTask, GetTasksInStates, and TransferTasks2History when adjusting the
expectations. If the production limit is defined in storage/task_table.go, align
the test with that source of truth instead of maxCleanupTaskBatchSize so the
test fails when the real batch size diverges.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 29dee4af-30bc-45cd-bdad-62a5a9c1c9e4
📒 Files selected for processing (17)
pkg/dxf/framework/integrationtests/bench_test.gopkg/dxf/framework/proto/BUILD.bazelpkg/dxf/framework/proto/task.gopkg/dxf/framework/proto/task_test.gopkg/dxf/framework/scheduler/BUILD.bazelpkg/dxf/framework/scheduler/interface.gopkg/dxf/framework/scheduler/scheduler.gopkg/dxf/framework/scheduler/scheduler_manager.gopkg/dxf/framework/scheduler/scheduler_manager_nokit_test.gopkg/dxf/framework/scheduler/scheduler_nokit_test.gopkg/dxf/framework/scheduler/scheduler_test.gopkg/dxf/framework/storage/table_test.gopkg/dxf/framework/storage/task_table.gopkg/server/handler/tests/dxf_test.gopkg/server/handler/tikvhandler/dxf.gopkg/server/http_status.gotests/realtikvtest/addindextest1/disttask_test.go
| const maxCleanupTaskBatchSize = 100 | ||
|
|
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify maxCleanupTaskBatchSize is not used in production code, and find the real enforcement point.
rg -nP '\bmaxCleanupTaskBatchSize\b' --type=go
rg -nP 'limit\s+100' --type=go pkg/dxf/framework/storageRepository: pingcap/tidb
Length of output: 150
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate relevant files and inspect the surrounding code that defines and uses the cleanup batch size.
git ls-files 'pkg/dxf/framework/**' | rg 'scheduler_manager\.go|task_table\.go|scheduler_manager_nokit_test\.go|proto|interface\.go'
echo '--- scheduler_manager.go ---'
sed -n '1,260p' pkg/dxf/framework/scheduler/scheduler_manager.go
echo '--- task_table.go ---'
sed -n '1,260p' pkg/dxf/framework/storage/task_table.go
echo '--- scheduler_manager_nokit_test.go ---'
sed -n '1,220p' pkg/dxf/framework/scheduler/scheduler_manager_nokit_test.goRepository: pingcap/tidb
Length of output: 24775
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find every reference to the cleanup batch-size constant and the SQL limit in the framework.
rg -n 'maxCleanupTaskBatchSize|limit 100|GetTasksInStates|cleanupFinishedTasks|doCleanupTask' pkg/dxf/frameworkRepository: pingcap/tidb
Length of output: 5745
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect any surrounding declarations if the batch size constant lives in a nearby package.
rg -n 'Max.*Cleanup|Cleanup.*Batch|BatchSize|GetMaxConcurrentTask|MaxConcurrentTask' pkg/dxf/frameworkRepository: pingcap/tidb
Length of output: 5420
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '425,470p' pkg/dxf/framework/scheduler/scheduler_manager.go
sed -n '434,452p' pkg/dxf/framework/storage/task_table.goRepository: pingcap/tidb
Length of output: 2263
Deduplicate the cleanup batch size constant.
maxCleanupTaskBatchSize is only used by scheduler_manager_nokit_test.go; the actual cleanup limit is still the hardcoded limit 100 in pkg/dxf/framework/storage/task_table.go. That leaves two independent copies of the same value. Move the batch size to a shared constant and use it in both places.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/dxf/framework/scheduler/scheduler_manager.go` around lines 54 - 55, The
cleanup batch size is duplicated between maxCleanupTaskBatchSize and the
hardcoded limit in task_table.go, so there are two independent sources of truth.
Move the value into a shared constant that can be referenced by both
scheduler_manager_nokit_test.go and the cleanup logic in task_table.go, then
replace both literals with that shared symbol to keep the limit consistent.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Codecov Report❌ Patch coverage is Please upload reports for the commit 7f617ad to get more accurate results. Additional details and impacted files@@ Coverage Diff @@
## release-nextgen-202603 #69597 +/- ##
===========================================================
Coverage ? 76.1399%
===========================================================
Files ? 1935
Lines ? 540255
Branches ? 0
===========================================================
Hits ? 411350
Misses ? 128905
Partials ? 0
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/dxf/framework/storage/task_table.go (1)
443-449: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueDocument the
*3limit multiplier rationale.The existing
GetMaxConcurrentTask()*2limit for top unfinished tasks is documented ininterface.go("returns unfinished tasks, limited by GetMaxConcurrentTask()*2, to make sure low ranking tasks can be scheduled if resource is enough"). The new*3multiplier here has no such explanation, making it unclear why this specific factor was chosen versus*2or a fixed value.📝 Suggested comment
+ // limit results to GetMaxConcurrentTask()*3 to bound query cost while still + // covering enough tasks across states in a single tick. args := make([]any, 0, len(states)+1) args = append(args, states...) args = append(args, proto.GetMaxConcurrentTask()*3)As per coding guidelines,
**/*.gocode "Comments SHOULD explain non-obvious intent, constraints, invariants, concurrency guarantees, SQL/compatibility contracts, or important performance trade-offs."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/dxf/framework/storage/task_table.go` around lines 443 - 449, Document the non-obvious intent behind the `proto.GetMaxConcurrentTask()*3` limit in the task query so it’s clear why this multiplier differs from the `GetMaxConcurrentTask()*2` policy described in `interface.go`. Add a concise comment near the `ExecuteSQLWithNewSession` call in `task_table.go` explaining the scheduling/performance reason for choosing `*3`, and reference the relevant task-fetching logic such as `TaskColumns`, `states`, and the unfinished-task selection query so future readers can connect the rationale to this code path.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@pkg/dxf/framework/storage/task_table.go`:
- Around line 443-449: Document the non-obvious intent behind the
`proto.GetMaxConcurrentTask()*3` limit in the task query so it’s clear why this
multiplier differs from the `GetMaxConcurrentTask()*2` policy described in
`interface.go`. Add a concise comment near the `ExecuteSQLWithNewSession` call
in `task_table.go` explaining the scheduling/performance reason for choosing
`*3`, and reference the relevant task-fetching logic such as `TaskColumns`,
`states`, and the unfinished-task selection query so future readers can connect
the rationale to this code path.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 90d0fb2b-a073-404c-8469-9e4e008662e4
📒 Files selected for processing (2)
pkg/dxf/framework/storage/table_test.gopkg/dxf/framework/storage/task_table.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/dxf/framework/storage/table_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/dxf/importinto/clean_up.go`:
- Around line 138-156: `sendMeterOnCleanUpInParallel` is incorrectly using
`needFileCleanUp` to decide whether to emit cleanup metering, which ties
billing/reporting to file deletion eligibility. Update the loop in
`sendMeterOnCleanUpInParallel` so `sendFn` runs for succeeded tasks regardless
of `needFileCleanUp`, and keep file cleanup gating separate from metering logic.
Use the existing `cleanUpTaskInfo`, `task.State`, and `sendMeterOnCleanUpFunc`
flow to locate and adjust the condition.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 4f3f7f4e-3ce8-4877-802a-b22ac0e9047b
📒 Files selected for processing (4)
.agents/skills/tidb-test-guidelines/references/dxf-case-map.mdpkg/dxf/importinto/BUILD.bazelpkg/dxf/importinto/clean_up.gopkg/dxf/importinto/clean_up_test.go
✅ Files skipped from review due to trivial changes (1)
- .agents/skills/tidb-test-guidelines/references/dxf-case-map.md
| func sendMeterOnCleanUpInParallel(ctx context.Context, cleanUpTasks []cleanUpTaskInfo, sendFn sendMeterOnCleanUpFunc) error { | ||
| eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx) | ||
| eg.SetLimit(cleanUpMeteringConcurrency) | ||
| for _, cleanUpTask := range cleanUpTasks { | ||
| cleanUpTask := cleanUpTask | ||
| if !cleanUpTask.needFileCleanUp || cleanUpTask.task.State != proto.TaskStateSucceed { | ||
| continue | ||
| } | ||
| eg.Go(func() error { | ||
| logger := logutil.BgLogger().With(zap.Int64("task-id", cleanUpTask.task.ID)) | ||
| if err := sendFn(egCtx, cleanUpTask.task, logger); err != nil { | ||
| logger.Warn("failed to send metering data on cleanup", zap.Error(err)) | ||
| return err | ||
| } | ||
| return nil | ||
| }) | ||
| } | ||
| return eg.Wait() | ||
| } |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check whether nextgen import plans can have an empty CloudStorageURI while still succeeding.
rg -n -B3 -A10 'CloudStorageURI' pkg/dxf/importinto/planner.goRepository: pingcap/tidb
Length of output: 2708
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the cleanup flow and how needFileCleanUp is derived.
grep -nE 'needFileCleanUp|sendMeterOnCleanUp|CloudStorageURI|TaskStateSucceed' pkg/dxf/importinto/clean_up.go
printf '\n---\n'
sed -n '1,260p' pkg/dxf/importinto/clean_up.go
printf '\n=== planner references ===\n'
# Show the planner context around CloudStorageURI use to see whether it is optional or required.
sed -n '1,130p' pkg/dxf/importinto/planner.go
printf '\n---\n'
sed -n '390,490p' pkg/dxf/importinto/planner.go
printf '\n---\n'
sed -n '710,770p' pkg/dxf/importinto/planner.goRepository: pingcap/tidb
Length of output: 17571
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find where CloudStorageURI is validated or defaulted, and whether nextgen imports require it.
rg -n 'CloudStorageURI|IsNextGen|GlobalSort|GetSortStore|manual.*cloud|cloud storage' pkg/dxf/importinto pkg/executor/importer
printf '\n=== task/meta definitions ===\n'
rg -n 'type TaskMeta|type PostProcessStepMeta|type Plan struct|CloudStorageURI string' pkg/dxf/importinto pkg/executor/importer
printf '\n=== focused file slices ===\n'
sed -n '1,220p' pkg/executor/importer/plan.go 2>/dev/null || true
sed -n '1,260p' pkg/executor/importer/*.go 2>/dev/null | sed -n '1,260p' || trueRepository: pingcap/tidb
Length of output: 19134
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect how Plan.CloudStorageURI affects nextgen/local-sort behavior and whether an empty value is valid.
sed -n '700,780p' pkg/executor/importer/import.go
printf '\n---\n'
sed -n '930,1010p' pkg/executor/importer/import.go
printf '\n---\n'
sed -n '1828,1855p' pkg/executor/importer/import.go
printf '\n---\n'
sed -n '180,230p' pkg/dxf/importinto/scheduler.go
printf '\n---\n'
sed -n '1,220p' pkg/executor/importer/precheck.goRepository: pingcap/tidb
Length of output: 13151
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Show the scheduler logic around GlobalSort derivation and nextgen cleanup scheduling.
sed -n '160,240p' pkg/dxf/importinto/scheduler.go
printf '\n---\n'
sed -n '280,360p' pkg/dxf/importinto/scheduler.go
printf '\n---\n'
sed -n '600,650p' pkg/dxf/importinto/scheduler.goRepository: pingcap/tidb
Length of output: 7357
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect import plan construction for when CloudStorageURI is empty vs defaulted, especially in nextgen.
sed -n '720,770p' pkg/executor/importer/import.go
printf '\n---\n'
sed -n '940,970p' pkg/executor/importer/import.go
printf '\n---\n'
sed -n '1240,1295p' pkg/executor/importer/import.go
printf '\n---\n'
sed -n '1,120p' pkg/executor/importer/precheck.goRepository: pingcap/tidb
Length of output: 8183
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the plan lifecycle and precheck rules to determine whether empty CloudStorageURI is allowed for nextgen imports.
sed -n '720,770p' pkg/executor/importer/import.go
printf '\n---\n'
sed -n '940,970p' pkg/executor/importer/import.go
printf '\n---\n'
sed -n '1248,1288p' pkg/executor/importer/import.go
printf '\n---\n'
sed -n '1,190p' pkg/executor/importer/precheck.goRepository: pingcap/tidb
Length of output: 9242
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# See what metric data SendRowAndSizeMeterData represents and where else it is sent.
rg -n 'SendRowAndSizeMeterData\(' pkg | sed -n '1,120p'
printf '\n---\n'
sed -n '1,220p' pkg/dxf/framework/handle/*.go 2>/dev/null || trueRepository: pingcap/tidb
Length of output: 7343
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the tests around CloudStorageURI to see whether nextgen plans can be empty or are always populated.
sed -n '150,230p' pkg/executor/importer/import_test.go
printf '\n---\n'
sed -n '620,650p' pkg/executor/importer/import_test.go
printf '\n---\n'
sed -n '360,390p' pkg/dxf/importinto/planner_test.goRepository: pingcap/tidb
Length of output: 5687
Decouple metering from file-cleanup eligibility. needFileCleanUp only tracks cloud-storage deletion, but it also gates sendMeterOnCleanUp, so next-gen jobs that run with an empty CloudStorageURI (local sort) will skip row/size reporting and undercount billing.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/dxf/importinto/clean_up.go` around lines 138 - 156,
`sendMeterOnCleanUpInParallel` is incorrectly using `needFileCleanUp` to decide
whether to emit cleanup metering, which ties billing/reporting to file deletion
eligibility. Update the loop in `sendMeterOnCleanUpInParallel` so `sendFn` runs
for succeeded tasks regardless of `needFileCleanUp`, and keep file cleanup
gating separate from metering logic. Use the existing `cleanUpTaskInfo`,
`task.State`, and `sendMeterOnCleanUpFunc` flow to locate and adjust the
condition.
|
@D3Hunter: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: ref #61702
Problem Summary:
The nextgen DXF service needs runtime controls for the shared task scheduler, and the scheduler should avoid repeatedly concentrating max-node-limited tasks on the first eligible nodes. Finished-task cleanup also needs to avoid loading an unbounded number of tasks in one cleanup tick.
What changed and how does it work?
This PR:
MaxNodeCounteligible nodes before generating subtasks for the next scheduler step;Check List
Tests
Manual test:
git diff --check upstream/release-nextgen-202603...HEADSide effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.
Summary by CodeRabbit
New Features
GET/POST /dxf/task/max_concurrent).Bug Fixes
MaxNodeCountapplies (instead of deterministic truncation).Tests