MQ-1202 Include metrics metadata in queue() handler message batch#6339
MQ-1202 Include metrics metadata in queue() handler message batch#6339KennethRuan wants to merge 1 commit intocloudflare:mainfrom
Conversation
|
/bigbonk review this |
There was a problem hiding this comment.
Review summary
Overall this is a solid, well-structured change. The compat flag gating, capnp schema additions, and test coverage are all done correctly. A few items to address:
- [Medium]
backlogCountandbacklogByteswould read more naturally as integer types rather thandoubleat the JS API level — consider whether the upstream data truly requires floating-point representation. - [Low]
QueueEvent::getMetadata()andQueueController::getMetadata()returnMessageBatchMetadataby value, creating a copy on every property access. For a struct of three doubles this is cheap, but worth noting for consistency with lazy-evaluation patterns used formessages. - [Low] The
Fetcher::queue()TS override showsmetadata?: MessageBatchMetadataunconditionally, meaning the type signature is visible in tooling even for workers without thequeue_batch_metadataflag. This is fine for a testing API, but worth confirming it's intentional. - [Low] The disabled-flag test service could use a
compatibilityDateto be more explicit.
This review was generated by an AI assistant and may contain inaccuracies. Please verify all suggestions before applying.
| messages = messagesBuilder.finish(); | ||
|
|
||
| // Extract metadata. If the sender didn't set the field, capnp defaults all values to zero. | ||
| auto m = params.getMetadata().getMetrics(); |
There was a problem hiding this comment.
Good comment explaining the capnp default-to-zero behavior. This is important for forward/backward compatibility when the sender doesn't populate the metadata field.
| fetch(input: RequestInfo | URL, init?: RequestInit): Promise<Response>; | ||
| connect(address: SocketAddress | string, options?: SocketOptions): Socket; | ||
| queue(queueName: string, messages: ServiceBindingQueueMessage[]): Promise<FetcherQueueResult>; | ||
| queue(queueName: string, messages: ServiceBindingQueueMessage[], metadata?: MessageBatchMetadata): Promise<FetcherQueueResult>; |
There was a problem hiding this comment.
The metadata?: MessageBatchMetadata parameter in the Fetcher TS override is not gated by the queue_batch_metadata compat flag — it's always visible when service_binding_extra_handlers is on. This means tooling will suggest the parameter even for workers that don't have the metadata flag enabled (the consumer just won't see the data on the batch).
This is reasonable for a testing/service-binding API, but worth confirming this is intentional. Alternatively, you could conditionally include the metadata parameter in the TS override based on the flag, though that adds complexity for arguably little benefit.
There was a problem hiding this comment.
EDIT: After some more investigation, I would like to keep the metadata?: MessageBatchMetadata parameter on Fetcher ungated.
If we were to add the flag to both the caller side (Fetcher::queue()) and receiver side (queue() handler), I believe it would introduce unnecessary, albeit minor, complexity for rollout and reverts. The Fetcher::queue() method is already behind the experimental service_binding_extra_handlers so we should be safe to make these changes.
That being said, I don't hold a strong opinion on this, so happy to change :)
|
Review posted successfully. Here's a summary of the findings: Key findings posted on PR #6339:
No safety, memory, or correctness issues were found. The capnp ordinal ( |
6ea1850 to
c232a53
Compare
c232a53 to
244cd5c
Compare
244cd5c to
c395270
Compare
Summary
Adds a new metadata property to queue message batches (
MessageBatch/QueueEvent), exposing queue metrics with each batch delivered to a worker'squeue()handler.Changes
This PR includes changes to
queue.c++andqueue.hto include metrics on each message batch. The metadata contains three metrics from the upstream queue broker worker (backlogCount,backlogBytes,oldestMessageTimestamp). Below is what the generated types will look like:The new type changes are gated behind a
queue_batch_metadatacompat flag so that it can be tested internally before release. Additionally, changes were made tohttp.c++andhttp.hso that theFetcherused for testing can accept an optionalmetadataargument.Upstream changes can be found here: https://gitlab.cfdata.org/cloudflare/mq/queue-broker-worker/-/merge_requests/1765
Testing
bazel test //src/workerd/api/tests:queue-test@bazel test //src/workerd/api/tests:queue-test@all-compat-flagsbazel test //src/workerd/api/tests:queue-metadata-test@bazel test //src/workerd/api/tests:queue-metadata-test@all-compat-flagsbazel test //types:test/types/rpc