-
Notifications
You must be signed in to change notification settings - Fork 731
ct: Fix L0 object size distribution #29781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4f80f56
51933b6
a64227b
5112f61
0b3e512
e57427a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
|
|
||
| #include <chrono> | ||
| #include <exception> | ||
| #include <limits> | ||
| #include <variant> | ||
|
|
||
| using namespace std::chrono_literals; | ||
|
|
@@ -237,72 +238,128 @@ ss::future<std::expected<std::monostate, errc>> batcher<Clock>::run_once( | |
| template<class Clock> | ||
| ss::future<> batcher<Clock>::bg_controller_loop() { | ||
| auto h = _gate.hold(); | ||
| bool more_work = false; | ||
| while (!_as.abort_requested()) { | ||
| if (!more_work) { | ||
| auto wait_res = co_await _stage.wait_next(&_as); | ||
| if (!wait_res.has_value()) { | ||
| // Shutting down | ||
| vlog( | ||
| _logger.info, | ||
| "Batcher upload loop is shutting down {}", | ||
| wait_res.error()); | ||
| co_return; | ||
| } | ||
| auto wait_res = co_await _stage.wait_next(&_as); | ||
| if (!wait_res.has_value()) { | ||
| vlog( | ||
| _logger.info, | ||
| "Batcher upload loop is shutting down {}", | ||
| wait_res.error()); | ||
| co_return; | ||
| } | ||
| if (_as.abort_requested()) { | ||
| vlog(_logger.info, "Batcher upload loop is shutting down"); | ||
| co_return; | ||
| } | ||
|
|
||
| // Acquire semaphore units to limit concurrent background fibers. | ||
| // This blocks until a slot is available. | ||
| auto units_fut = co_await ss::coroutine::as_future( | ||
| ss::get_units(_upload_sem, 1, _as)); | ||
| // Pull all available write requests at once. | ||
| auto all = _stage.pull_write_requests( | ||
| std::numeric_limits<size_t>::max(), | ||
| std::numeric_limits<size_t>::max()); | ||
|
|
||
| auto list = _stage.pull_write_requests( | ||
| config::shard_local_cfg() | ||
| .cloud_topics_produce_batching_size_threshold(), | ||
| config::shard_local_cfg() | ||
| .cloud_topics_produce_cardinality_threshold()); | ||
|
|
||
| bool complete = list.complete; | ||
| if (all.requests.empty()) { | ||
| continue; | ||
| } | ||
|
|
||
| if (units_fut.failed()) { | ||
| auto ex = units_fut.get_exception(); | ||
| vlog(_logger.info, "Batcher upload loop is shutting down: {}", ex); | ||
| co_return; | ||
| // Calculate total size and split into evenly-sized chunks, | ||
| // each close to the configured threshold. | ||
| size_t total_size = 0; | ||
| for (const auto& wr : all.requests) { | ||
| total_size += wr.size_bytes(); | ||
| } | ||
| auto units = std::move(units_fut.get()); | ||
|
|
||
| // We can spawn the work in the background without worrying about memory | ||
| // usage because the background fibers is holding units acquired above. | ||
| ssx::spawn_with_gate( | ||
| _gate, | ||
| [this, list = std::move(list), units = std::move(units)]() mutable { | ||
| return run_once(std::move(list)) | ||
| .then([this](std::expected<std::monostate, errc> res) { | ||
| if (!res.has_value()) { | ||
| if (res.error() == errc::shutting_down) { | ||
| vlog( | ||
| _logger.info, | ||
| "Batcher upload loop is shutting down"); | ||
| } else { | ||
| vlog( | ||
| _logger.info, | ||
| "Batcher upload loop error: {}", | ||
| res.error()); | ||
|
|
||
| auto threshold = config::shard_local_cfg() | ||
| .cloud_topics_produce_batching_size_threshold(); | ||
|
|
||
| // If we will allow every chunk to be 'threshold' size | ||
| // then the last chunk in the list has an opportunity to be | ||
| // much smaller than the rest. Consider a case when the threshold | ||
| // is 4MiB and we got 8MiB + 1KiB in one iteration. If we will | ||
| // upload two 4MiB objects we will have to upload 1KiB object next. | ||
| // The solution is to allow upload size to deviate more if it allows | ||
| // us to avoid overly small objects (which will impact TCO). | ||
| // | ||
| // The computation below can yield small target_chunk_size in case | ||
| // if total_size is below the threshold. If the total_size exceeds | ||
| // the threshold the target_chunk_size can either overshoot the | ||
| // threshold or undershoot. In both cases the error is bounded by | ||
| // about 50%. The largest error is an overshoot in case if | ||
| // the total_size is 6MiB - 1 byte and the threshold is 4MiB. The | ||
| // num_chunks in this case is 1 and the target_chunk_size is approx. | ||
| // 6MiB which is 2MiB over the threshold (50% of 4MiB threshold). | ||
| // If the total_size is 6MiB the num_chunks will be 2 and the | ||
| // target_chunk_size will be 3MiB which is 1MiB below the threshold | ||
| // (or 25% of 4MiB). | ||
|
oleiman marked this conversation as resolved.
|
||
| size_t num_chunks = std::max( | ||
| size_t{1}, (total_size + threshold / 2) / threshold); | ||
| size_t target_chunk_size = total_size / num_chunks; | ||
|
Comment on lines
+293
to
+295
|
||
|
|
||
| vlog( | ||
| _logger.trace, | ||
| "Splitting {} ({} bytes) into {} chunks, target chunk size: {} " | ||
| "({})", | ||
| human::bytes(total_size), | ||
| total_size, | ||
| num_chunks, | ||
| human::bytes(target_chunk_size), | ||
| target_chunk_size); | ||
|
|
||
| for (size_t i = 0; i < num_chunks && !all.requests.empty(); ++i) { | ||
| auto units_fut = co_await ss::coroutine::as_future( | ||
| ss::get_units(_upload_sem, 1, _as)); | ||
|
|
||
|
Comment on lines
+307
to
+310
|
||
| if (units_fut.failed()) { | ||
| auto ex = units_fut.get_exception(); | ||
| vlog( | ||
| _logger.info, "Batcher upload loop is shutting down: {}", ex); | ||
| co_return; | ||
| } | ||
| auto units = std::move(units_fut.get()); | ||
|
|
||
| // Build a chunk by moving requests from the pulled list | ||
| // until we reach the target size (unless it's the last | ||
| // chunk, upload everything in this case). | ||
| typename write_pipeline<Clock>::write_requests_list chunk( | ||
| all._parent, all._ps); | ||
|
|
||
| size_t chunk_size = 0; | ||
| bool is_last = (i == num_chunks - 1); | ||
| while (!all.requests.empty()) { | ||
| auto& wr = all.requests.front(); | ||
| auto sz = wr.size_bytes(); | ||
| chunk_size += sz; | ||
| wr._hook.unlink(); | ||
| chunk.requests.push_back(wr); | ||
| // Allow the last chunk to be larger than the target | ||
| // to avoid small objects. | ||
| if (!is_last && chunk_size >= target_chunk_size) { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| ssx::spawn_with_gate( | ||
| _gate, | ||
| [this, | ||
| chunk = std::move(chunk), | ||
| units = std::move(units)]() mutable { | ||
| return run_once(std::move(chunk)) | ||
| .then([this](std::expected<std::monostate, errc> res) { | ||
| if (!res.has_value()) { | ||
| if (res.error() == errc::shutting_down) { | ||
| vlog( | ||
| _logger.info, | ||
| "Batcher upload loop is shutting down"); | ||
| } else { | ||
| vlog( | ||
| _logger.info, | ||
| "Batcher upload loop error: {}", | ||
| res.error()); | ||
| } | ||
| } | ||
| } | ||
| }) | ||
| .finally([u = std::move(units)] {}); | ||
| }); | ||
|
|
||
| // The work is spawned in the background so we can grab data for the | ||
| // next L0 object. If complete==true, all pending requests were pulled, | ||
| // so wait for more. If complete==false, there are more pending | ||
| // requests. | ||
| more_work = !complete; | ||
| }) | ||
| .finally([u = std::move(units)] {}); | ||
| }); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.