ct: Fix L0 object size distribution#29781
Conversation
when estimating the remaining work. Previously, the 'complete' flag was computed incorrectly. It was computed based on total size of all requests at all stages. This commit fixes this so the 'complete' flag only takes into account 'stage' requests. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
The metric tracks the size of the backlog of the next stage for every group. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR adjusts the L0 write path scheduling and batching logic to reduce unnecessary small-object uploads, fix pipeline “complete” reporting, and correct cross-shard accounting that could inflate perceived backlog and trigger overly aggressive group splitting.
Changes:
- Fix
completecomputation in read/write pipelines so it reflects whether this stage still has pending work (not whether the whole pending list is empty). - Fix cross-shard proxied write request accounting by adding bytes to the target shard’s next stage, and add a scheduler metric to observe next-stage bytes used in split/merge decisions.
- Rework the batcher background loop to pull all available requests and split uploads into near-threshold chunks to avoid tiny tail uploads.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.h | Extends try_schedule_upload API to accept a scheduler probe. |
| src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.cc | Emits next-stage backlog metric, adjusts split threshold, wires probe through scheduler tick. |
| src/v/cloud_topics/level_zero/write_request_scheduler/tests/write_request_scheduler_test.cc | Updates tests to pass the new probe argument. |
| src/v/cloud_topics/level_zero/pipeline/write_pipeline.cc | Fixes stage-scoped complete computation; adds next-stage byte accounting for foreign (proxied) requests. |
| src/v/cloud_topics/level_zero/pipeline/read_pipeline.cc | Fixes stage-scoped complete computation for read requests. |
| src/v/cloud_topics/level_zero/common/level_zero_probe.h | Adds next_stage_bytes gauge to the scheduler probe. |
| src/v/cloud_topics/level_zero/common/level_zero_probe.cc | Registers the new next_stage_bytes metric. |
| src/v/cloud_topics/level_zero/batcher/batcher.cc | Changes background batching to pull all requests and split into near-threshold chunks. |
| for (size_t i = 0; i < num_chunks && !all.requests.empty(); ++i) { | ||
| auto units_fut = co_await ss::coroutine::as_future( | ||
| ss::get_units(_upload_sem, 1, _as)); | ||
|
|
There was a problem hiding this comment.
The loop waits for _upload_sem after pulling all requests out of the pipeline. If the semaphore is saturated, requests that are already in all.requests can sit here for a long time and bypass the pipeline’s remove_timed_out_requests() path, so they may get uploaded/acked even after expiration_time has passed. Consider acquiring upload units before pulling (as before), or explicitly dropping/acking expired requests while they’re buffered in all.requests (including re-checking before each chunk).
There was a problem hiding this comment.
kind of in this vein, i wonder if it would be easier to add the ability to "peek" into pending requests (rather than pulling all of them) and use the total size or whatever to compute a target size & cardinality for the actual pull, then leave the rest of the loop body more or less as it was before.
| size_t num_chunks = std::max( | ||
| size_t{1}, (total_size + threshold / 2) / threshold); | ||
| size_t target_chunk_size = total_size / num_chunks; |
There was a problem hiding this comment.
cloud_topics_produce_batching_size_threshold() has no minimum bound (it’s a plain property<size_t>), so it can be configured to 0. In that case the num_chunks computation will divide by zero and crash the batcher loop. Please add an explicit guard for threshold == 0 (fallback value + log) before calculating num_chunks/target_chunk_size.
Retry command for Build#81534please wait until all jobs are finished before running the slash command |
|
/ci-repeat 1 |
Retry command for Build#81543please wait until all jobs are finished before running the slash command |
rockwotj
left a comment
There was a problem hiding this comment.
I am concerned about all these really quite subtle issues are being addressed here and there are no unit tests to ensure we don't regress or that this logic all works properly
73bc424 to
a7a57ab
Compare
I added some tests |
oleiman
left a comment
There was a problem hiding this comment.
looks reasonable. few comments & questions
| [this] { return _next_stage_bytes; }, | ||
| sm::description( | ||
| "Bytes buffered in the next pipeline stage, used for " | ||
| "group split/merge decisions."), |
There was a problem hiding this comment.
tracks the size of the backlog of the next stage for every group.
does this mean "all active groups"?
There was a problem hiding this comment.
no, this is only tracked on a leader shard of the group, and it includes all batchers from all shards that belong to the group
There was a problem hiding this comment.
cool, that's how it reads. I'm a little confused about how this works w/ labeling and such, but I think I need to read up on the terminology a bit.
src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.cc
Show resolved
Hide resolved
| probe.set_next_stage_bytes(next_stage_bytes); | ||
| // Calculate dynamic split threshold based on group size | ||
| size_t group_split_threshold = max_buffer_size * grp_size; | ||
| size_t group_split_threshold = 2 * max_buffer_size * grp_size; |
There was a problem hiding this comment.
q: what's the rationale for this factor of two? appears in a couple of places, so maybe worth a bit of explanation in the commit message.
There was a problem hiding this comment.
Particular number is not very important here. I increased the size of the backlog that triggers the group split to make it a bit more conservative. If the group can't keep up it will reach 2x or 3x or whatever the multiplier is. It will just take a bit more time. The reason why I think this backlog size is better is because of the small groups. If the group is small (2 shards) both shards upload more frequently so it's quite easy for such group to get split during some relatively small spike in activity. This makes smaller groups unstable.
| for (size_t i = 0; i < num_chunks && !all.requests.empty(); ++i) { | ||
| auto units_fut = co_await ss::coroutine::as_future( | ||
| ss::get_units(_upload_sem, 1, _as)); | ||
|
|
There was a problem hiding this comment.
kind of in this vein, i wonder if it would be easier to add the ability to "peek" into pending requests (rather than pulling all of them) and use the total size or whatever to compute a target size & cardinality for the actual pull, then leave the rest of the loop body more or less as it was before.
The write request scheduler is accounting for the x-shard work incorrectly. Because of that it often splits groups agressively which causes uploaded L0 objects to be smaller than neccessary. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Allow the L0 object size to deviate more to avoid small objects. If we will allow every chunk to be 'threshold' size then the last chunk in the list has an opportunity to be much smaller than the rest. Consider a case when the threshold is 4MiB and we got 8MiB + 1KiB in one iteration. If we will upload two 4MiB objects we will have to upload 1KiB object next. The solution is to allow upload size to deviate more if it allows us to avoid overly small objects (which will impact TCO). The computation below can yield small target_chunk_size in case if total_size is below the threshold. If the total_size exceeds the threshold the target_chunk_size can either overshoot the threshold or undershoot. In both cases the error is bounded by about 50%. The largest error is an overshoot in case if the total_size is 6MiB - 1 byte and the threshold is 4MiB. The num_chunks in this case is 1 and the target_chunk_size is approx. 6MiB which is 2MiB over the threshold (50% of 4MiB threshold). If the total_size is 6MiB the num_chunks will be 2 and the target_chunk_size will be 3MiB which is 1MiB below the threshold (or 25% of 4MiB). Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
The test works correctly but sometimes fails to finish producing data in time. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
The test checks L0 object size distribution. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
a7a57ab to
e57427a
Compare
Tweak the batcher a bit. Currently, the batcher uploads a lot of small objects. There are few reasons for that.
pull_write_requestmethod is not computed correctly. The method doesn't take the stage into account. So the batcher always tries to pull iterate one more time even if there is no work or the request are not deposited fully (write_request_schedulerstarted to send requests but not finished yet).write_request_scheduler. When the scheduler proxies the request it subtracts the byte count from the current shard but doesn't add bytes to the target shard. In some cases it trigger an overflow which looks as a very large batcher backlog. This causes the scheduler to aggressively split groups. With more groups than needed the batcher uploads small objects.Backports Required
Release Notes