-
Notifications
You must be signed in to change notification settings - Fork 592
MQ-1202 Include metrics metadata in queue() handler message batch #6339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
KennethRuan
wants to merge
1
commit into
cloudflare:main
Choose a base branch
from
KennethRuan:kruan/MQ-1202-add-metrics-metadata-to-queue-handler
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+243
−19
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -445,11 +445,23 @@ QueueEvent::QueueEvent( | |
| messagesBuilder.add(js.alloc<QueueMessage>(js, incoming[i], result)); | ||
| } | ||
| messages = messagesBuilder.finish(); | ||
|
|
||
| // Extract metadata. If the sender didn't set the field, capnp defaults all values to zero. | ||
| auto m = params.getMetadata().getMetrics(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good comment explaining the capnp default-to-zero behavior. This is important for forward/backward compatibility when the sender doesn't populate the metadata field. |
||
| metadata = MessageBatchMetadata{ | ||
| .metrics = | ||
| MessageBatchMetrics{ | ||
| .backlogCount = m.getBacklogCount(), | ||
| .backlogBytes = m.getBacklogBytes(), | ||
| .oldestMessageTimestamp = m.getOldestMessageTimestamp(), | ||
| }, | ||
| }; | ||
| } | ||
|
|
||
| QueueEvent::QueueEvent(jsg::Lock& js, Params params, IoPtr<QueueEventResult> result) | ||
| : ExtendableEvent("queue"), | ||
| queueName(kj::mv(params.queueName)), | ||
| metadata(kj::mv(params.metadata)), | ||
| result(result) { | ||
| auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(params.messages.size()); | ||
| for (auto i: kj::indices(params.messages)) { | ||
|
|
@@ -742,6 +754,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc( | |
| KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) { | ||
| req.setQueueName(p.getQueueName()); | ||
| req.setMessages(p.getMessages()); | ||
| req.setMetadata(p.getMetadata()); | ||
| } | ||
| KJ_CASE_ONEOF(p, QueueEvent::Params) { | ||
| req.setQueueName(p.queueName); | ||
|
|
@@ -755,6 +768,13 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc( | |
| } | ||
| messages[i].setAttempts(p.messages[i].attempts); | ||
| } | ||
| { | ||
| auto metadataBuilder = req.initMetadata(); | ||
| auto metricsBuilder = metadataBuilder.initMetrics(); | ||
| metricsBuilder.setBacklogCount(p.metadata.metrics.backlogCount); | ||
| metricsBuilder.setBacklogBytes(p.metadata.metrics.backlogBytes); | ||
| metricsBuilder.setOldestMessageTimestamp(p.metadata.metrics.oldestMessageTimestamp); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| // Copyright (c) 2026 Cloudflare, Inc. | ||
| // Licensed under the Apache 2.0 license found in the LICENSE file or at: | ||
| // https://opensource.org/licenses/Apache-2.0 | ||
|
|
||
| import assert from 'node:assert'; | ||
|
|
||
| export default { | ||
| async queue(batch, env, ctx) { | ||
| const flagEnabled = env.METADATA_FLAG; | ||
|
|
||
| if (!flagEnabled) { | ||
| // Flag disabled → metadata property should not exist | ||
| assert.strictEqual(batch.metadata, undefined); | ||
| batch.ackAll(); | ||
| return; | ||
| } | ||
|
|
||
| // Flag enabled → metadata should always be present | ||
| assert.ok(batch.metadata, 'Expected batch.metadata to be defined'); | ||
| assert.ok( | ||
| batch.metadata.metrics, | ||
| 'Expected batch.metadata.metrics to be defined' | ||
| ); | ||
|
|
||
| if ( | ||
| batch.metadata.metrics.backlogCount === 0 && | ||
| batch.metadata.metrics.backlogBytes === 0 && | ||
| batch.metadata.metrics.oldestMessageTimestamp === 0 | ||
| ) { | ||
| // If metadata is omitted → all values default to zero | ||
| batch.ackAll(); | ||
| return; | ||
| } | ||
|
|
||
| // Explicit metadata path | ||
| assert.strictEqual(batch.metadata.metrics.backlogCount, 100); | ||
| assert.strictEqual(batch.metadata.metrics.backlogBytes, 2048); | ||
| assert.strictEqual(batch.metadata.metrics.oldestMessageTimestamp, 1000000); | ||
| batch.ackAll(); | ||
| }, | ||
|
|
||
| async test(ctrl, env, ctx) { | ||
| const flagEnabled = env.METADATA_FLAG; | ||
| const timestamp = new Date(); | ||
|
|
||
| if (flagEnabled) { | ||
| const response1 = await env.SERVICE.queue( | ||
| 'test-queue', | ||
| [{ id: '0', timestamp, body: 'hello', attempts: 1 }], | ||
| { | ||
| metrics: { | ||
| backlogCount: 100, | ||
| backlogBytes: 2048, | ||
| oldestMessageTimestamp: 1000000, | ||
| }, | ||
| } | ||
| ); | ||
| assert.strictEqual(response1.outcome, 'ok'); | ||
| assert(response1.ackAll); | ||
|
|
||
| // Test with omitted metadata | ||
| const response2 = await env.SERVICE.queue('test-queue', [ | ||
| { id: '1', timestamp, body: 'world', attempts: 1 }, | ||
| ]); | ||
| assert.strictEqual(response2.outcome, 'ok'); | ||
| assert(response2.ackAll); | ||
| } else { | ||
| // Flag disabled → handler still works, metadata not visible | ||
| const response = await env.SERVICE.queue('test-queue', [ | ||
| { id: '0', timestamp, body: 'foobar', attempts: 1 }, | ||
| ]); | ||
| assert.strictEqual(response.outcome, 'ok'); | ||
| assert(response.ackAll); | ||
| } | ||
| }, | ||
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| using Workerd = import "/workerd/workerd.capnp"; | ||
|
|
||
| const unitTests :Workerd.Config = ( | ||
| services = [ | ||
| ( name = "queue-metadata-test", | ||
| worker = ( | ||
| modules = [ | ||
| ( name = "worker", esModule = embed "queue-metadata-test.js" ) | ||
| ], | ||
| bindings = [ | ||
| ( name = "SERVICE", service = "queue-metadata-test" ), | ||
| ( name = "METADATA_FLAG", json = "true" ), | ||
| ], | ||
| compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "experimental"], | ||
| ) | ||
| ), | ||
| ( name = "queue-metadata-disabled-test", | ||
jasnell marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| worker = ( | ||
| modules = [ | ||
| ( name = "worker-disabled", esModule = embed "queue-metadata-test.js" ) | ||
| ], | ||
| bindings = [ | ||
| ( name = "SERVICE", service = "queue-metadata-disabled-test" ), | ||
| ( name = "METADATA_FLAG", json = "false" ), | ||
| ], | ||
| compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"], | ||
| ) | ||
| ), | ||
| ], | ||
| ); | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
metadata?: MessageBatchMetadataparameter in the Fetcher TS override is not gated by thequeue_batch_metadatacompat flag — it's always visible whenservice_binding_extra_handlersis on. This means tooling will suggest the parameter even for workers that don't have the metadata flag enabled (the consumer just won't see the data on the batch).This is reasonable for a testing/service-binding API, but worth confirming this is intentional. Alternatively, you could conditionally include the
metadataparameter in the TS override based on the flag, though that adds complexity for arguably little benefit.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: After some more investigation, I would like to keep the
metadata?: MessageBatchMetadataparameter onFetcherungated.If we were to add the flag to both the caller side (
Fetcher::queue()) and receiver side (queue()handler), I believe it would introduce unnecessary, albeit minor, complexity for rollout and reverts. TheFetcher::queue()method is already behind the experimentalservice_binding_extra_handlersso we should be safe to make these changes.That being said, I don't hold a strong opinion on this, so happy to change :)