Skip to content

ct: Fix L0 object size distribution#29781

Merged
rockwotj merged 6 commits intoredpanda-data:devfrom
Lazin:ct/batcher-l0-size-fix
Mar 11, 2026
Merged

ct: Fix L0 object size distribution#29781
rockwotj merged 6 commits intoredpanda-data:devfrom
Lazin:ct/batcher-l0-size-fix

Conversation

@Lazin
Copy link
Contributor

@Lazin Lazin commented Mar 10, 2026

Tweak the batcher a bit. Currently, the batcher uploads a lot of small objects. There are few reasons for that.

  • The 'complete' flag returned by the pull_write_request method is not computed correctly. The method doesn't take the stage into account. So the batcher always tries to pull iterate one more time even if there is no work or the request are not deposited fully (write_request_scheduler started to send requests but not finished yet).
  • There is a work accounting bug in the write_request_scheduler. When the scheduler proxies the request it subtracts the byte count from the current shard but doesn't add bytes to the target shard. In some cases it trigger an overflow which looks as a very large batcher backlog. This causes the scheduler to aggressively split groups. With more groups than needed the batcher uploads small objects.
  • Finally, the batcher itself may upload small objects depending on the workload. E.g. if the total size of all requests sent to the batcher is 4.1MiB it will upload 4MiB object (target size) + 100KiB object (the remaining part). The the last commit fixes this by forcing batcher to group data more so there is no small uploads unless the batcher is given very little data.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.3.x
  • v25.2.x
  • v25.1.x

Release Notes

  • none

Lazin added 2 commits March 9, 2026 11:26
when estimating the remaining work. Previously, the 'complete' flag was
computed incorrectly. It was computed based on total size of all
requests at all stages. This commit fixes this so the 'complete' flag
only takes into account 'stage' requests.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
The metric tracks the size of the backlog of the next stage for every
group.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
@Lazin Lazin requested review from Copilot and dotnwat March 10, 2026 09:56
@Lazin Lazin requested a review from oleiman March 10, 2026 09:56
@Lazin Lazin changed the title Ct/batcher l0 size fix ct: Fix L0 object size distribution Mar 10, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adjusts the L0 write path scheduling and batching logic to reduce unnecessary small-object uploads, fix pipeline “complete” reporting, and correct cross-shard accounting that could inflate perceived backlog and trigger overly aggressive group splitting.

Changes:

  • Fix complete computation in read/write pipelines so it reflects whether this stage still has pending work (not whether the whole pending list is empty).
  • Fix cross-shard proxied write request accounting by adding bytes to the target shard’s next stage, and add a scheduler metric to observe next-stage bytes used in split/merge decisions.
  • Rework the batcher background loop to pull all available requests and split uploads into near-threshold chunks to avoid tiny tail uploads.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.h Extends try_schedule_upload API to accept a scheduler probe.
src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.cc Emits next-stage backlog metric, adjusts split threshold, wires probe through scheduler tick.
src/v/cloud_topics/level_zero/write_request_scheduler/tests/write_request_scheduler_test.cc Updates tests to pass the new probe argument.
src/v/cloud_topics/level_zero/pipeline/write_pipeline.cc Fixes stage-scoped complete computation; adds next-stage byte accounting for foreign (proxied) requests.
src/v/cloud_topics/level_zero/pipeline/read_pipeline.cc Fixes stage-scoped complete computation for read requests.
src/v/cloud_topics/level_zero/common/level_zero_probe.h Adds next_stage_bytes gauge to the scheduler probe.
src/v/cloud_topics/level_zero/common/level_zero_probe.cc Registers the new next_stage_bytes metric.
src/v/cloud_topics/level_zero/batcher/batcher.cc Changes background batching to pull all requests and split into near-threshold chunks.

Comment on lines +307 to +310
for (size_t i = 0; i < num_chunks && !all.requests.empty(); ++i) {
auto units_fut = co_await ss::coroutine::as_future(
ss::get_units(_upload_sem, 1, _as));

Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop waits for _upload_sem after pulling all requests out of the pipeline. If the semaphore is saturated, requests that are already in all.requests can sit here for a long time and bypass the pipeline’s remove_timed_out_requests() path, so they may get uploaded/acked even after expiration_time has passed. Consider acquiring upload units before pulling (as before), or explicitly dropping/acking expired requests while they’re buffered in all.requests (including re-checking before each chunk).

Copilot uses AI. Check for mistakes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of in this vein, i wonder if it would be easier to add the ability to "peek" into pending requests (rather than pulling all of them) and use the total size or whatever to compute a target size & cardinality for the actual pull, then leave the rest of the loop body more or less as it was before.

Comment on lines +293 to +295
size_t num_chunks = std::max(
size_t{1}, (total_size + threshold / 2) / threshold);
size_t target_chunk_size = total_size / num_chunks;
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloud_topics_produce_batching_size_threshold() has no minimum bound (it’s a plain property<size_t>), so it can be configured to 0. In that case the num_chunks computation will divide by zero and crash the batcher loop. Please add an explicit guard for threshold == 0 (fallback value + log) before calculating num_chunks/target_chunk_size.

Copilot uses AI. Check for mistakes.
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Mar 10, 2026

Retry command for Build#81534

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cloud_topics/l0_gc_test.py::CloudTopicsL0GCNodeFailureTest.test_node_failure_mid_gc@{"cloud_storage_type":2}
tests/rptest/tests/cloud_topics/l0_gc_test.py::CloudTopicsL0GCNodeFailureTest.test_node_failure_mid_gc@{"cloud_storage_type":1}

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Mar 10, 2026

CI test results

test results on build#81534
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
CloudTopicsL0GCNodeFailureTest test_node_failure_mid_gc {"cloud_storage_type": 2} integration https://buildkite.com/redpanda/redpanda/builds/81534#019cd73a-3742-4c2c-a986-4f63f1a96333 FLAKY 4/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0283, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CloudTopicsL0GCNodeFailureTest&test_method=test_node_failure_mid_gc
CloudTopicsL0GCNodeFailureTest test_node_failure_mid_gc {"cloud_storage_type": 1} integration https://buildkite.com/redpanda/redpanda/builds/81534#019cd739-be61-4cfb-8db5-bf665af4736d FLAKY 3/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0283, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CloudTopicsL0GCNodeFailureTest&test_method=test_node_failure_mid_gc
test results on build#81543
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
CloudTopicsL0GCNodeFailureTest test_node_failure_mid_gc {"cloud_storage_type": 2} integration https://buildkite.com/redpanda/redpanda/builds/81543#019cd86f-8868-48b1-ae5d-a5d0681ce2f1 FLAKY 2/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0290, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CloudTopicsL0GCNodeFailureTest&test_method=test_node_failure_mid_gc
CloudTopicsL0GCNodeFailureTest test_node_failure_mid_gc {"cloud_storage_type": 2} integration https://buildkite.com/redpanda/redpanda/builds/81543#019cd86f-886c-422d-b793-0398a3eefa2b FLAKY 1/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0290, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CloudTopicsL0GCNodeFailureTest&test_method=test_node_failure_mid_gc
test results on build#81569
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
DatalakeCustomPartitioningTest test_spec_evolution_rpcn {"catalog_type": "rest_jdbc", "cloud_storage_type": 1} integration https://buildkite.com/redpanda/redpanda/builds/81569#019cd982-28ae-4565-a3c4-bb2b0b3a02a7 FLAKY 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0000, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=DatalakeCustomPartitioningTest&test_method=test_spec_evolution_rpcn
test results on build#81620
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
WriteCachingFailureInjectionE2ETest test_crash_all {"use_transactions": false} integration https://buildkite.com/redpanda/redpanda/builds/81620#019cdd1b-a0dd-48af-bd3d-f4a063d86a69 FLAKY 16/21 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0907, p0=0.1016, reject_threshold=0.0100. adj_baseline=0.2482, p1=0.4222, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=WriteCachingFailureInjectionE2ETest&test_method=test_crash_all

@Lazin
Copy link
Contributor Author

Lazin commented Mar 10, 2026

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cloud_topics/l0_gc_test.py::CloudTopicsL0GCNodeFailureTest.test_node_failure_mid_gc@{"cloud_storage_type":2}
tests/rptest/tests/cloud_topics/l0_gc_test.py::CloudTopicsL0GCNodeFailureTest.test_node_failure_mid_gc@{"cloud_storage_type":1}

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Mar 10, 2026

Retry command for Build#81543

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/cloud_topics/l0_gc_test.py::CloudTopicsL0GCNodeFailureTest.test_node_failure_mid_gc@{"cloud_storage_type":2}

@dotnwat dotnwat requested a review from rockwotj March 10, 2026 18:22
Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned about all these really quite subtle issues are being addressed here and there are no unit tests to ensure we don't regress or that this logic all works properly

@Lazin Lazin force-pushed the ct/batcher-l0-size-fix branch from 73bc424 to a7a57ab Compare March 10, 2026 20:30
@Lazin
Copy link
Contributor Author

Lazin commented Mar 10, 2026

I am concerned about all these really quite subtle issues are being addressed here and there are no unit tests to ensure we don't regress or that this logic all works properly

I added some tests

@Lazin Lazin requested a review from rockwotj March 10, 2026 20:32
Copy link
Member

@oleiman oleiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks reasonable. few comments & questions

[this] { return _next_stage_bytes; },
sm::description(
"Bytes buffered in the next pipeline stage, used for "
"group split/merge decisions."),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracks the size of the backlog of the next stage for every group.

does this mean "all active groups"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this is only tracked on a leader shard of the group, and it includes all batchers from all shards that belong to the group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, that's how it reads. I'm a little confused about how this works w/ labeling and such, but I think I need to read up on the terminology a bit.

probe.set_next_stage_bytes(next_stage_bytes);
// Calculate dynamic split threshold based on group size
size_t group_split_threshold = max_buffer_size * grp_size;
size_t group_split_threshold = 2 * max_buffer_size * grp_size;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: what's the rationale for this factor of two? appears in a couple of places, so maybe worth a bit of explanation in the commit message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Particular number is not very important here. I increased the size of the backlog that triggers the group split to make it a bit more conservative. If the group can't keep up it will reach 2x or 3x or whatever the multiplier is. It will just take a bit more time. The reason why I think this backlog size is better is because of the small groups. If the group is small (2 shards) both shards upload more frequently so it's quite easy for such group to get split during some relatively small spike in activity. This makes smaller groups unstable.

Comment on lines +307 to +310
for (size_t i = 0; i < num_chunks && !all.requests.empty(); ++i) {
auto units_fut = co_await ss::coroutine::as_future(
ss::get_units(_upload_sem, 1, _as));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of in this vein, i wonder if it would be easier to add the ability to "peek" into pending requests (rather than pulling all of them) and use the total size or whatever to compute a target size & cardinality for the actual pull, then leave the rest of the loop body more or less as it was before.

Lazin added 4 commits March 11, 2026 09:19
The write request scheduler is accounting for the x-shard work
incorrectly. Because of that it often splits groups agressively which
causes uploaded L0 objects to be smaller than neccessary.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Allow the L0 object size to deviate more to avoid small objects.

If we will allow every chunk to be 'threshold' size
then the last chunk in the list has an opportunity to be
much smaller than the rest. Consider a case when the threshold
is 4MiB and we got 8MiB + 1KiB in one iteration. If we will
upload two 4MiB objects we will have to upload 1KiB object next.
The solution is to allow upload size to deviate more if it allows
us to avoid overly small objects (which will impact TCO).

The computation below can yield small target_chunk_size in case
if total_size is below the threshold. If the total_size exceeds
the threshold the target_chunk_size can either overshoot the
threshold or undershoot. In both cases the error is bounded by
about 50%. The largest error is an overshoot in case if
the total_size is 6MiB - 1 byte and the threshold is 4MiB. The
num_chunks in this case is 1 and the target_chunk_size is approx.
6MiB which is 2MiB over the threshold (50% of 4MiB threshold).
If the total_size is 6MiB the num_chunks will be 2 and the
target_chunk_size will be 3MiB which is 1MiB below the threshold
(or 25% of 4MiB).

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
The test works correctly but sometimes fails to finish producing data in
time.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
The test checks L0 object size distribution.

Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
@Lazin Lazin force-pushed the ct/batcher-l0-size-fix branch from a7a57ab to e57427a Compare March 11, 2026 13:21
Copy link
Member

@oleiman oleiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@rockwotj rockwotj merged commit 95d2011 into redpanda-data:dev Mar 11, 2026
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants