Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2230,8 +2230,10 @@ jsg::Promise<void> Fetcher::delete_(jsg::Lock& js, kj::String url) {
return throwOnError(js, "DELETE", fetchImpl(js, JSG_THIS, kj::mv(url), kj::mv(subInit)));
}

jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages) {
jsg::Promise<Fetcher::QueueResult> Fetcher::queue(jsg::Lock& js,
kj::String queueName,
kj::Array<ServiceBindingQueueMessage> messages,
jsg::Optional<MessageBatchMetadata> metadata) {
auto& ioContext = IoContext::current();

auto encodedMessages = kj::heapArrayBuilder<IncomingQueueMessage>(messages.size());
Expand Down Expand Up @@ -2264,6 +2266,7 @@ jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
auto event = kj::refcounted<api::QueueCustomEvent>(QueueEvent::Params{
.queueName = kj::mv(queueName),
.messages = encodedMessages.finish(),
.metadata = kj::mv(metadata).orDefault({}),
});

auto eventRef =
Expand Down
8 changes: 5 additions & 3 deletions src/workerd/api/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,10 @@ class Fetcher: public JsRpcClientProvider {
JSG_STRUCT(outcome, ackAll, retryBatch, explicitAcks, retryMessages);
};

jsg::Promise<QueueResult> queue(
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages);
jsg::Promise<QueueResult> queue(jsg::Lock& js,
kj::String queueName,
kj::Array<ServiceBindingQueueMessage> messages,
jsg::Optional<MessageBatchMetadata> metadata);

struct ScheduledOptions {
jsg::Optional<kj::Date> scheduledTime;
Expand Down Expand Up @@ -446,7 +448,7 @@ class Fetcher: public JsRpcClientProvider {
) & {
fetch(input: RequestInfo | URL, init?: RequestInit): Promise<Response>;
connect(address: SocketAddress | string, options?: SocketOptions): Socket;
queue(queueName: string, messages: ServiceBindingQueueMessage[]): Promise<FetcherQueueResult>;
queue(queueName: string, messages: ServiceBindingQueueMessage[], metadata?: MessageBatchMetadata): Promise<FetcherQueueResult>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata?: MessageBatchMetadata parameter in the Fetcher TS override is not gated by the queue_batch_metadata compat flag — it's always visible when service_binding_extra_handlers is on. This means tooling will suggest the parameter even for workers that don't have the metadata flag enabled (the consumer just won't see the data on the batch).

This is reasonable for a testing/service-binding API, but worth confirming this is intentional. Alternatively, you could conditionally include the metadata parameter in the TS override based on the flag, though that adds complexity for arguably little benefit.

Copy link
Author

@KennethRuan KennethRuan Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: After some more investigation, I would like to keep the metadata?: MessageBatchMetadata parameter on Fetcher ungated.

If we were to add the flag to both the caller side (Fetcher::queue()) and receiver side (queue() handler), I believe it would introduce unnecessary, albeit minor, complexity for rollout and reverts. The Fetcher::queue() method is already behind the experimental service_binding_extra_handlers so we should be safe to make these changes.

That being said, I don't hold a strong opinion on this, so happy to change :)

scheduled(options?: FetcherScheduledOptions): Promise<FetcherScheduledResult>;
});
} else {
Expand Down
20 changes: 20 additions & 0 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,23 @@ QueueEvent::QueueEvent(
messagesBuilder.add(js.alloc<QueueMessage>(js, incoming[i], result));
}
messages = messagesBuilder.finish();

// Extract metadata. If the sender didn't set the field, capnp defaults all values to zero.
auto m = params.getMetadata().getMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comment explaining the capnp default-to-zero behavior. This is important for forward/backward compatibility when the sender doesn't populate the metadata field.

metadata = MessageBatchMetadata{
.metrics =
MessageBatchMetrics{
.backlogCount = m.getBacklogCount(),
.backlogBytes = m.getBacklogBytes(),
.oldestMessageTimestamp = m.getOldestMessageTimestamp(),
},
};
}

QueueEvent::QueueEvent(jsg::Lock& js, Params params, IoPtr<QueueEventResult> result)
: ExtendableEvent("queue"),
queueName(kj::mv(params.queueName)),
metadata(kj::mv(params.metadata)),
result(result) {
auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(params.messages.size());
for (auto i: kj::indices(params.messages)) {
Expand Down Expand Up @@ -742,6 +754,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
req.setQueueName(p.getQueueName());
req.setMessages(p.getMessages());
req.setMetadata(p.getMetadata());
}
KJ_CASE_ONEOF(p, QueueEvent::Params) {
req.setQueueName(p.queueName);
Expand All @@ -755,6 +768,13 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
}
messages[i].setAttempts(p.messages[i].attempts);
}
{
auto metadataBuilder = req.initMetadata();
auto metricsBuilder = metadataBuilder.initMetrics();
metricsBuilder.setBacklogCount(p.metadata.metrics.backlogCount);
metricsBuilder.setBacklogBytes(p.metadata.metrics.backlogBytes);
metricsBuilder.setOldestMessageTimestamp(p.metadata.metrics.oldestMessageTimestamp);
}
}
}

Expand Down
70 changes: 59 additions & 11 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ class WorkerQueue: public jsg::Object {

// Event handler types

// Metadata delivered with a message batch in the queue() handler

struct MessageBatchMetrics {
double backlogCount;
double backlogBytes;
double oldestMessageTimestamp;
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetrics);
};

struct MessageBatchMetadata {
MessageBatchMetrics metrics;
JSG_STRUCT(metrics);
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetadata);
};

// Types for other workers passing messages into and responses out of a queue handler.

struct IncomingQueueMessage {
Expand Down Expand Up @@ -212,6 +228,7 @@ class QueueEvent final: public ExtendableEvent {
struct Params {
kj::String queueName;
kj::Array<IncomingQueueMessage> messages;
MessageBatchMetadata metadata;
};

explicit QueueEvent(jsg::Lock& js,
Expand All @@ -227,30 +244,45 @@ class QueueEvent final: public ExtendableEvent {
kj::StringPtr getQueueName() {
return queueName;
}
MessageBatchMetadata getMetadata() {
return metadata;
}

void retryAll(jsg::Optional<QueueRetryOptions> options);
void ackAll();

JSG_RESOURCE_TYPE(QueueEvent) {
JSG_RESOURCE_TYPE(QueueEvent, CompatibilityFlags::Reader flags) {
JSG_INHERIT(ExtendableEvent);

JSG_LAZY_READONLY_INSTANCE_PROPERTY(messages, getMessages);
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);

if (flags.getWorkerdExperimental()) {
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
}

JSG_METHOD(retryAll);
JSG_METHOD(ackAll);

JSG_TS_ROOT();
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
readonly messages: readonly Message<Body>[];
readonly metadata: MessageBatchMetadata;
});
} else {
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
}
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
for (auto& message: messages) {
tracker.trackField("message", message);
}
tracker.trackField("queueName", queueName);
tracker.trackFieldWithSize("metadata", sizeof(MessageBatchMetadata));
tracker.trackFieldWithSize("IoPtr<QueueEventResult>", sizeof(IoPtr<QueueEventResult>));
}

Expand All @@ -274,6 +306,7 @@ class QueueEvent final: public ExtendableEvent {
// array to avoid one intermediate copy?
kj::Array<jsg::Ref<QueueMessage>> messages;
kj::String queueName;
MessageBatchMetadata metadata;
IoPtr<QueueEventResult> result;
CompletionStatus completionStatus = Incomplete{};

Expand All @@ -293,24 +326,38 @@ class QueueController final: public jsg::Object {
kj::StringPtr getQueueName() {
return event->getQueueName();
}
MessageBatchMetadata getMetadata() {
return event->getMetadata();
}
void retryAll(jsg::Optional<QueueRetryOptions> options) {
event->retryAll(options);
}
void ackAll() {
event->ackAll();
}

JSG_RESOURCE_TYPE(QueueController) {
JSG_RESOURCE_TYPE(QueueController, CompatibilityFlags::Reader flags) {
JSG_READONLY_INSTANCE_PROPERTY(messages, getMessages);
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);

if (flags.getWorkerdExperimental()) {
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
}

JSG_METHOD(retryAll);
JSG_METHOD(ackAll);

JSG_TS_ROOT();
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
readonly messages: readonly Message<Body>[];
readonly metadata: MessageBatchMetadata;
});
} else {
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
}
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down Expand Up @@ -377,8 +424,9 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re

#define EW_QUEUE_ISOLATE_TYPES \
api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
api::WorkerQueue::MessageSendRequest, api::IncomingQueueMessage, api::QueueRetryBatch, \
api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \
api::QueueEvent, api::QueueController, api::QueueExportedHandler
api::WorkerQueue::MessageSendRequest, api::MessageBatchMetrics, api::MessageBatchMetadata, \
api::IncomingQueueMessage, api::QueueRetryBatch, api::QueueRetryMessage, api::QueueResponse, \
api::QueueRetryOptions, api::QueueMessage, api::QueueEvent, api::QueueController, \
api::QueueExportedHandler

} // namespace workerd::api
6 changes: 6 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ wd_test(
],
)

wd_test(
src = "queue-metadata-test.wd-test",
args = ["--experimental"],
data = ["queue-metadata-test.js"],
)

wd_test(
src = "r2-test.wd-test",
args = ["--experimental"],
Expand Down
76 changes: 76 additions & 0 deletions src/workerd/api/tests/queue-metadata-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2026 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';

export default {
async queue(batch, env, ctx) {
const flagEnabled = env.METADATA_FLAG;

if (!flagEnabled) {
// Flag disabled → metadata property should not exist
assert.strictEqual(batch.metadata, undefined);
batch.ackAll();
return;
}

// Flag enabled → metadata should always be present
assert.ok(batch.metadata, 'Expected batch.metadata to be defined');
assert.ok(
batch.metadata.metrics,
'Expected batch.metadata.metrics to be defined'
);

if (
batch.metadata.metrics.backlogCount === 0 &&
batch.metadata.metrics.backlogBytes === 0 &&
batch.metadata.metrics.oldestMessageTimestamp === 0
) {
// If metadata is omitted → all values default to zero
batch.ackAll();
return;
}

// Explicit metadata path
assert.strictEqual(batch.metadata.metrics.backlogCount, 100);
assert.strictEqual(batch.metadata.metrics.backlogBytes, 2048);
assert.strictEqual(batch.metadata.metrics.oldestMessageTimestamp, 1000000);
batch.ackAll();
},

async test(ctrl, env, ctx) {
const flagEnabled = env.METADATA_FLAG;
const timestamp = new Date();

if (flagEnabled) {
const response1 = await env.SERVICE.queue(
'test-queue',
[{ id: '0', timestamp, body: 'hello', attempts: 1 }],
{
metrics: {
backlogCount: 100,
backlogBytes: 2048,
oldestMessageTimestamp: 1000000,
},
}
);
assert.strictEqual(response1.outcome, 'ok');
assert(response1.ackAll);

// Test with omitted metadata
const response2 = await env.SERVICE.queue('test-queue', [
{ id: '1', timestamp, body: 'world', attempts: 1 },
]);
assert.strictEqual(response2.outcome, 'ok');
assert(response2.ackAll);
} else {
// Flag disabled → handler still works, metadata not visible
const response = await env.SERVICE.queue('test-queue', [
{ id: '0', timestamp, body: 'foobar', attempts: 1 },
]);
assert.strictEqual(response.outcome, 'ok');
assert(response.ackAll);
}
},
};
30 changes: 30 additions & 0 deletions src/workerd/api/tests/queue-metadata-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "queue-metadata-test",
worker = (
modules = [
( name = "worker", esModule = embed "queue-metadata-test.js" )
],
bindings = [
( name = "SERVICE", service = "queue-metadata-test" ),
( name = "METADATA_FLAG", json = "true" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "experimental"],
)
),
( name = "queue-metadata-disabled-test",
worker = (
modules = [
( name = "worker-disabled", esModule = embed "queue-metadata-test.js" )
],
bindings = [
( name = "SERVICE", service = "queue-metadata-disabled-test" ),
( name = "METADATA_FLAG", json = "false" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
)
),
],
);
20 changes: 18 additions & 2 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,20 @@ struct QueueResponse @0x90e98932c0bfc0de {
# List of retry options for messages that were explicitly marked for retry.
}

struct MessageBatchMetrics {
backlogCount @0 :Float64;
# Number of messages remaining in the queue backlog.
backlogBytes @1 :Float64;
# Total bytes of messages remaining in the queue backlog.
oldestMessageTimestamp @2 :Float64;
# Timestamp (ms since epoch) of the oldest message in the queue.
}

struct MessageBatchMetadata {
metrics @0 :MessageBatchMetrics;
# Best effort queue metrics at the time the batch was dispatched.
}

struct HibernatableWebSocketEventMessage {
payload :union {
text @0 :Text;
Expand Down Expand Up @@ -756,11 +770,13 @@ interface EventDispatcher @0xf20697475ec1752d {
# It would be cleaner to handle that inside the implementation so we could mark the entire
# interface (and file) with allowCancellation.

queue @8 (messages :List(QueueMessage), queueName :Text) -> (result :QueueResponse)
queue @8 (messages :List(QueueMessage), queueName :Text, metadata :MessageBatchMetadata)
-> (result :QueueResponse)
$Cxx.allowCancellation;
# Delivers a batch of queue messages to a worker's queue event handler. Returns information about
# the success of the batch, including which messages should be considered acknowledged and which
# should be retried.
# should be retried. The optional metadata field carries queue metrics at the time the batch was
# dispatched; it is safe for the sender to omit this field (the consumer sees it as absent).

jsRpcSession @9 () -> (topLevel :JsRpcTarget) $Cxx.allowCancellation;
# Opens a JS rpc "session". The call does not return until the session is complete.
Expand Down
Loading
Loading