Skip to content

MQ-1200 Add response body to queue send() and sendBatch()#6354

Open
KennethRuan wants to merge 1 commit intocloudflare:mainfrom
KennethRuan:kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch
Open

MQ-1200 Add response body to queue send() and sendBatch()#6354
KennethRuan wants to merge 1 commit intocloudflare:mainfrom
KennethRuan:kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch

Conversation

@KennethRuan
Copy link

@KennethRuan KennethRuan commented Mar 18, 2026

Summary

Adds support for returning structured JSON responses from the send() and sendBatch() methods from a worker's env.QUEUE binding. The changes are gated behind the queue_send_response_body/no_queue_send_response_body flag.

Changes

Depending on the experimental queue_send_response_body flag, either Promise<void> or Promise<QueueSendResponse> will be returned by the send methods. In order to support both the old and new return types, the functions had to be duplicated.

jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock& js,
    jsg::JsValue body,
    jsg::Optional<SendOptions> options,
    const jsg::TypeHandler<SendResponse>& responseHandler) {

  ...

  static constexpr auto handleSend = [](auto req, auto serialized, auto client, auto& headerIds,
                                         bool exposeErrorCodes) -> kj::Promise<kj::String> {
    ...

    auto responseBody = co_await response.body->readAllBytes();
    co_return kj::str(responseBody.asChars());
  };

  auto promise =
      handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes);

  return context.awaitIo(
      js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse {
    auto parsed = jsg::JsValue::fromJson(js, text);
    KJ_IF_SOME(result, responseHandler.tryUnwrap(js, parsed)) {
      return kj::mv(result);
    }
    _JSG_INTERNAL_FAIL_REQUIRE(JSG_EXCEPTION(Error), "Failed to parse queue send response", text);
  });
}

On the Typescript side, these new functions are exposed with the same name.

  JSG_RESOURCE_TYPE(WorkerQueue, CompatibilityFlags::Reader flags) {
    if (flags.getQueueSendResponseBody()) {
      JSG_METHOD_NAMED(send, sendWithResponse);
      JSG_METHOD_NAMED(sendBatch, sendBatchWithResponse);
    } else {
      JSG_METHOD(send);
      JSG_METHOD(sendBatch);
    }

Upstream changes can be found here: https://gitlab.cfdata.org/cloudflare/mq/queue-broker-worker/-/merge_requests/1768#c210bdd061230e9c1f9da3b517fbecabd025c5c4

Testing

  • bazel test //src/workerd/api/tests:queue-test@
  • bazel test //src/workerd/api/tests:queue-test@all-compat-flags
  • bazel test //src/workerd/api/tests:queue-producer-metadata-test@
  • bazel test //src/workerd/api/tests:queue-producer-metadata-test@all-compat-flags

@KennethRuan KennethRuan requested review from a team as code owners March 18, 2026 21:50
jsg::JsValue body,
jsg::Optional<SendOptions> options,
const jsg::TypeHandler<SendResponse>& responseHandler);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget, did we decide to put send() / sendBatch() with responses behind a compat flag? To me this change looks non-breaking, and so does not necessarily need to be behind a compat flag. It'd certainly make implementation a lot simpler too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here for posterity:
Based on discussions, we decided to compat flag these changes. After internal testing, we'll need to put in a PR to default on flags and also regenerate the production type files. We were imagining all the metrics changes can go in together, so this may be the easier approach for us.

Link to other metrics-related Queues PRs:
#6246
#6339

@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 64.66667% with 53 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.67%. Comparing base (04c6588) to head (7bc3b24).
⚠️ Report is 145 commits behind head on main.

Files with missing lines Patch % Lines
src/workerd/api/queue.c++ 64.66% 34 Missing and 19 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #6354      +/-   ##
==========================================
- Coverage   70.80%   70.67%   -0.14%     
==========================================
  Files         422      420       -2     
  Lines      112347   113176     +829     
  Branches    18411    18561     +150     
==========================================
+ Hits        79547    79986     +439     
- Misses      21806    22128     +322     
- Partials    10994    11062      +68     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@KennethRuan KennethRuan force-pushed the kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch branch from 7bc3b24 to a88a242 Compare March 22, 2026 20:33
@KennethRuan KennethRuan force-pushed the kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch branch from a88a242 to e7cc945 Compare March 22, 2026 20:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants