Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 112 additions & 55 deletions src/v/cloud_topics/level_zero/batcher/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <chrono>
#include <exception>
#include <limits>
#include <variant>

using namespace std::chrono_literals;
Expand Down Expand Up @@ -237,72 +238,128 @@ ss::future<std::expected<std::monostate, errc>> batcher<Clock>::run_once(
template<class Clock>
ss::future<> batcher<Clock>::bg_controller_loop() {
auto h = _gate.hold();
bool more_work = false;
while (!_as.abort_requested()) {
if (!more_work) {
auto wait_res = co_await _stage.wait_next(&_as);
if (!wait_res.has_value()) {
// Shutting down
vlog(
_logger.info,
"Batcher upload loop is shutting down {}",
wait_res.error());
co_return;
}
auto wait_res = co_await _stage.wait_next(&_as);
if (!wait_res.has_value()) {
vlog(
_logger.info,
"Batcher upload loop is shutting down {}",
wait_res.error());
co_return;
}
if (_as.abort_requested()) {
vlog(_logger.info, "Batcher upload loop is shutting down");
co_return;
}

// Acquire semaphore units to limit concurrent background fibers.
// This blocks until a slot is available.
auto units_fut = co_await ss::coroutine::as_future(
ss::get_units(_upload_sem, 1, _as));
// Pull all available write requests at once.
auto all = _stage.pull_write_requests(
std::numeric_limits<size_t>::max(),
std::numeric_limits<size_t>::max());
Comment thread
Lazin marked this conversation as resolved.

auto list = _stage.pull_write_requests(
config::shard_local_cfg()
.cloud_topics_produce_batching_size_threshold(),
config::shard_local_cfg()
.cloud_topics_produce_cardinality_threshold());

bool complete = list.complete;
if (all.requests.empty()) {
continue;
}

if (units_fut.failed()) {
auto ex = units_fut.get_exception();
vlog(_logger.info, "Batcher upload loop is shutting down: {}", ex);
co_return;
// Calculate total size and split into evenly-sized chunks,
// each close to the configured threshold.
size_t total_size = 0;
for (const auto& wr : all.requests) {
total_size += wr.size_bytes();
}
auto units = std::move(units_fut.get());

// We can spawn the work in the background without worrying about memory
// usage because the background fibers is holding units acquired above.
ssx::spawn_with_gate(
_gate,
[this, list = std::move(list), units = std::move(units)]() mutable {
return run_once(std::move(list))
.then([this](std::expected<std::monostate, errc> res) {
if (!res.has_value()) {
if (res.error() == errc::shutting_down) {
vlog(
_logger.info,
"Batcher upload loop is shutting down");
} else {
vlog(
_logger.info,
"Batcher upload loop error: {}",
res.error());

auto threshold = config::shard_local_cfg()
.cloud_topics_produce_batching_size_threshold();

// If we will allow every chunk to be 'threshold' size
// then the last chunk in the list has an opportunity to be
// much smaller than the rest. Consider a case when the threshold
// is 4MiB and we got 8MiB + 1KiB in one iteration. If we will
// upload two 4MiB objects we will have to upload 1KiB object next.
// The solution is to allow upload size to deviate more if it allows
// us to avoid overly small objects (which will impact TCO).
//
// The computation below can yield small target_chunk_size in case
// if total_size is below the threshold. If the total_size exceeds
// the threshold the target_chunk_size can either overshoot the
// threshold or undershoot. In both cases the error is bounded by
// about 50%. The largest error is an overshoot in case if
// the total_size is 6MiB - 1 byte and the threshold is 4MiB. The
// num_chunks in this case is 1 and the target_chunk_size is approx.
// 6MiB which is 2MiB over the threshold (50% of 4MiB threshold).
// If the total_size is 6MiB the num_chunks will be 2 and the
// target_chunk_size will be 3MiB which is 1MiB below the threshold
// (or 25% of 4MiB).
Comment thread
oleiman marked this conversation as resolved.
size_t num_chunks = std::max(
size_t{1}, (total_size + threshold / 2) / threshold);
size_t target_chunk_size = total_size / num_chunks;
Comment on lines +293 to +295
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloud_topics_produce_batching_size_threshold() has no minimum bound (it’s a plain property<size_t>), so it can be configured to 0. In that case the num_chunks computation will divide by zero and crash the batcher loop. Please add an explicit guard for threshold == 0 (fallback value + log) before calculating num_chunks/target_chunk_size.

Copilot uses AI. Check for mistakes.

vlog(
_logger.trace,
"Splitting {} ({} bytes) into {} chunks, target chunk size: {} "
"({})",
human::bytes(total_size),
total_size,
num_chunks,
human::bytes(target_chunk_size),
target_chunk_size);

for (size_t i = 0; i < num_chunks && !all.requests.empty(); ++i) {
auto units_fut = co_await ss::coroutine::as_future(
ss::get_units(_upload_sem, 1, _as));

Comment on lines +307 to +310
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop waits for _upload_sem after pulling all requests out of the pipeline. If the semaphore is saturated, requests that are already in all.requests can sit here for a long time and bypass the pipeline’s remove_timed_out_requests() path, so they may get uploaded/acked even after expiration_time has passed. Consider acquiring upload units before pulling (as before), or explicitly dropping/acking expired requests while they’re buffered in all.requests (including re-checking before each chunk).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of in this vein, i wonder if it would be easier to add the ability to "peek" into pending requests (rather than pulling all of them) and use the total size or whatever to compute a target size & cardinality for the actual pull, then leave the rest of the loop body more or less as it was before.

if (units_fut.failed()) {
auto ex = units_fut.get_exception();
vlog(
_logger.info, "Batcher upload loop is shutting down: {}", ex);
co_return;
}
auto units = std::move(units_fut.get());

// Build a chunk by moving requests from the pulled list
// until we reach the target size (unless it's the last
// chunk, upload everything in this case).
typename write_pipeline<Clock>::write_requests_list chunk(
all._parent, all._ps);

size_t chunk_size = 0;
bool is_last = (i == num_chunks - 1);
while (!all.requests.empty()) {
auto& wr = all.requests.front();
auto sz = wr.size_bytes();
chunk_size += sz;
wr._hook.unlink();
chunk.requests.push_back(wr);
// Allow the last chunk to be larger than the target
// to avoid small objects.
if (!is_last && chunk_size >= target_chunk_size) {
break;
}
}

ssx::spawn_with_gate(
_gate,
[this,
chunk = std::move(chunk),
units = std::move(units)]() mutable {
return run_once(std::move(chunk))
.then([this](std::expected<std::monostate, errc> res) {
if (!res.has_value()) {
if (res.error() == errc::shutting_down) {
vlog(
_logger.info,
"Batcher upload loop is shutting down");
} else {
vlog(
_logger.info,
"Batcher upload loop error: {}",
res.error());
}
}
}
})
.finally([u = std::move(units)] {});
});

// The work is spawned in the background so we can grab data for the
// next L0 object. If complete==true, all pending requests were pulled,
// so wait for more. If complete==false, there are more pending
// requests.
more_work = !complete;
})
.finally([u = std::move(units)] {});
});
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_topics/level_zero/batcher/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ redpanda_cc_gtest(
"//src/v/storage:record_batch_utils",
"//src/v/test_utils:gtest",
"//src/v/test_utils:random_bytes",
"//src/v/test_utils:scoped_config",
"@googletest//:gtest",
"@seastar",
],
Expand Down
81 changes: 78 additions & 3 deletions src/v/cloud_topics/level_zero/batcher/tests/batcher_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "remote_mock.h"
#include "storage/record_batch_builder.h"
#include "test_utils/random_bytes.h"
#include "test_utils/scoped_config.h"
#include "test_utils/test.h"

#include <seastar/core/abort_source.hh>
Expand Down Expand Up @@ -352,6 +353,80 @@ TEST_CORO(batcher_test, expired_write_request) {
}
}

// TODO: add more tests
// - behaviour in case if pending write request sizes exceed L0 object size
// limit
TEST_CORO(batcher_test, chunk_splitting_balances_upload_sizes) {
scoped_config cfg;
// Use a small threshold so test data splits into multiple chunks.
cfg.get("cloud_topics_produce_batching_size_threshold")
.set_value(size_t{4096});

remote_mock mock;
mock.expect_upload_object_repeatedly();

cloud_storage_clients::bucket_name bucket("foo");
cloud_topics::l0::write_pipeline<ss::manual_clock> pipeline;
static_cluster_services cluster_services;
cloud_topics::l0::batcher<ss::manual_clock> batcher(
pipeline.register_write_pipeline_stage(),
bucket,
mock,
&cluster_services);
cloud_topics::l0::write_pipeline_accessor pipeline_accessor{
.pipeline = &pipeline,
};

// Push several write requests. Each has 1 batch with 10 records
// (~3KB serialized), so 6 requests total ~18KB. With threshold=4096
// this should produce multiple balanced chunks.
const int num_requests = 6;
std::vector<ss::future<std::expected<
chunked_vector<cloud_topics::extent_meta>,
std::error_code>>>
futures;

const auto timeout = 10s;
auto deadline = ss::manual_clock::now() + timeout;

for (int i = 0; i < num_requests; i++) {
auto [_, records, batches] = get_random_batches(1, 10);
futures.push_back(pipeline.write_and_debounce(
model::controller_ntp, min_epoch, std::move(batches), deadline));
}

// Wait for all write requests to be staged in the pipeline
// before starting the batcher. subscribe() checks pre-existing
// pending data, so bg_controller_loop's wait_next will return
// immediately seeing all requests at once.
co_await sleep_until(10ms, [&] {
return pipeline_accessor.write_requests_pending(num_requests);
});

// Start the batcher — bg_controller_loop will pull all 6 requests
// in one batch and split them into balanced chunks.
co_await batcher.start();

// Wait for all write request futures to resolve (the batcher
// loop processes chunks via spawn_with_gate, which sets the
// promises on each write request).
auto results = co_await ss::when_all_succeed(std::move(futures));
for (auto& res : results) {
ASSERT_TRUE_CORO(res.has_value());
}

co_await batcher.stop();

// Multiple uploads should happen since total data exceeds threshold.
ASSERT_GT_CORO(mock.payloads.size(), size_t{1});

// Verify uploads are balanced: no upload should be excessively small
// compared to the average.
size_t total_payload = 0;
for (const auto& p : mock.payloads) {
total_payload += p.size();
}
size_t avg_size = total_payload / mock.payloads.size();
for (size_t i = 0; i < mock.payloads.size(); i++) {
EXPECT_GE(mock.payloads[i].size(), avg_size / 3)
<< "Upload " << i << " size " << mock.payloads[i].size()
<< " is too small relative to average " << avg_size;
}
}
13 changes: 13 additions & 0 deletions src/v/cloud_topics/level_zero/batcher/tests/remote_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ class remote_mock final : public cloud_io::remote_api<ss::manual_clock> {
ss::make_ready_future<cloud_io::upload_result>(res)));
}

/// Accept any number of upload_object calls, recording keys and payloads.
void expect_upload_object_repeatedly(
cloud_io::upload_result res = cloud_io::upload_result::success) {
EXPECT_CALL(*this, upload_object(::testing::_))
.WillRepeatedly(
[this,
res](const cloud_io::basic_upload_request<ss::manual_clock>& req) {
keys.push_back(req.transfer_details.key);
payloads.push_back(iobuf_to_bytes(req.payload));
return ss::make_ready_future<cloud_io::upload_result>(res);
});
}

static std::deque<ss::sstring>
convert_bytes_to_string(const chunked_vector<bytes>& expected) {
std::deque<ss::sstring> result;
Expand Down
8 changes: 8 additions & 0 deletions src/v/cloud_topics/level_zero/common/level_zero_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ void write_request_scheduler_probe::setup_internal_metrics(bool disable) {
"active_groups",
[this] { return _active_groups; },
sm::description("Number of active upload groups in the scheduler."),
labels),

sm::make_gauge(
"next_stage_bytes",
[this] { return _next_stage_bytes; },
sm::description(
"Bytes buffered in the next pipeline stage, used for "
"group split/merge decisions."),
Comment thread
oleiman marked this conversation as resolved.
labels)});
}
read_merge_probe::read_merge_probe(bool disable) {
Expand Down
4 changes: 4 additions & 0 deletions src/v/cloud_topics/level_zero/common/level_zero_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class write_request_scheduler_probe {

void set_active_groups(uint64_t count) { _active_groups = count; }

void set_next_stage_bytes(uint64_t bytes) { _next_stage_bytes = bytes; }

private:
void setup_internal_metrics(bool disable);

Expand All @@ -128,6 +130,8 @@ class write_request_scheduler_probe {
uint64_t _rx_bytes_xshard{0};
/// Number of active upload groups
uint64_t _active_groups{0};
/// Bytes buffered in the next pipeline stage
uint64_t _next_stage_bytes{0};

metrics::internal_metric_groups _metrics;
};
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_topics/level_zero/pipeline/read_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ read_pipeline<Clock>::get_fetch_requests(
el._hook.unlink();
result.requests.push_back(el);
}
result.complete = pending.empty();
result.complete = std::none_of(
it, pending.end(), [stage](const auto& r) { return r.stage == stage; });
Comment thread
Lazin marked this conversation as resolved.
vlog(
logger.debug,
"get_fetch_requests returned {} requests which are querying {} ({}B)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,44 @@ TEST_CORO(write_pipeline_test, max_requests_limit) {

ASSERT_TRUE_CORO(accessor.write_requests_pending(0));
}

TEST_CORO(write_pipeline_test, enqueue_foreign_request_accounts_bytes) {
// Verify that enqueue_foreign_request updates _stage_bytes for
// the destination stage. This was previously missing, causing
// the scheduler to underreport cross-shard work.
cloud_topics::l0::write_pipeline<ss::manual_clock> pipeline;

auto stage1 = pipeline.register_write_pipeline_stage();
auto stage2 = pipeline.register_write_pipeline_stage();

ASSERT_EQ_CORO(pipeline.stage_bytes(stage2.id()), 0);

const auto timeout = ss::manual_clock::now() + 10s;

auto make_chunk = [&]() -> ss::future<cloud_topics::l0::serialized_chunk> {
chunked_vector<model::record_batch> batches;
auto data = co_await model::test::make_random_batches(
{.count = 1, .records = 5});
std::ranges::move(std::move(data), std::back_inserter(batches));
co_return co_await cloud_topics::l0::serialize_batches(
std::move(batches));
};

auto chunk = co_await make_chunk();
auto req
= std::make_unique<cloud_topics::l0::write_request<ss::manual_clock>>(
model::controller_ntp, min_epoch, std::move(chunk), timeout);
auto expected_size = req->size_bytes();

// enqueue_foreign_request should account bytes at the next stage
stage1.enqueue_foreign_request(*req, false);

ASSERT_EQ_CORO(pipeline.stage_bytes(stage2.id()), expected_size);

// Pull from stage2 — bytes should be released
auto res = stage2.pull_write_requests(std::numeric_limits<size_t>::max());
ASSERT_EQ_CORO(res.requests.size(), 1);
ASSERT_EQ_CORO(pipeline.stage_bytes(stage2.id()), 0);

res.requests.front().set_value(chunked_vector<cloud_topics::extent_meta>{});
}
Loading
Loading