-
Notifications
You must be signed in to change notification settings - Fork 192
feat(router): implement Cosmo Streams v1 #2273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Co-authored-by: Ludwig Bedacht <[email protected]> Co-authored-by: StarpTech <[email protected]> Co-authored-by: Dominik Korittki <[email protected]>
Co-authored-by: Ludwig Bedacht <[email protected]> Co-authored-by: StarpTech <[email protected]> Co-authored-by: Dominik Korittki <[email protected]>
WalkthroughAdds Cosmo Streams v1: new provider-agnostic stream/event types, hook interfaces and contexts (SubscriptionOnStart, OnReceiveEvents, OnPublishEvents), threads hooks through router core and pubsub layers, refactors provider/adapter/engine datasource APIs to use the new abstractions, and adds tests, mocks and configuration for concurrency/timeouts. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Areas to focus review on:
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Merge conflict was on go.mod files in router and router-tests, because a new upstream engine version was used. I go-getted the latest engine version of the topic branch, which includes the latest engine changes + cosmo streams.
❌ Internal Query Planner CI checks failedThe Internal Query Planner CI checks failed in the celestial repository, and this is going to stop the merge of this PR. |
Co-authored-by: Alessandro Pagnin <[email protected]>
) Co-authored-by: Alessandro Pagnin <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (2)
router-tests/modules/start_subscription_test.go (2)
173-255: Avoid blocking in StartSubscription callback via unbufferedcallbackCalledchannelIn the “close to true” test, the callback does:
callbackCalled := make(chan bool) Callback: func(ctx core.SubscriptionOnStartHandlerContext) error { callbackCalled <- true return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK) }and the test only reads from
callbackCalledafterWaitForSubscriptionCount(1, ...).If the router’s SubscriptionOnStart pipeline runs callbacks on the request’s critical path before incrementing the subscription count, this pattern could deadlock: the callback blocks on the send, the subscription count never reaches 1, and
WaitForSubscriptionCounttimes out.Even if today’s implementation uses a non‑blocking or separate goroutine for the callback, this test couples itself tightly to that detail.
To make the test robust against pipeline changes, consider one of:
- callbackCalled := make(chan bool) + callbackCalled := make(chan bool, 1)or reordering to wait on the callback first (wrapped in
AwaitChannelWithT), then asserting subscription counts.
257-357: Channel length assertion on unbufferedsubscriptionOneArgsChis effectively a no‑op
subscriptionOneArgsChis created unbuffered:subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs)and later you do:
t.Cleanup(func() { require.Len(t, subscriptionOneArgsCh, 0) })For an unbuffered channel,
len(subscriptionOneArgsCh)will always be 0 unless there is a buffered element (which never happens), so this assertion doesn’t actually prove that the callback was never invoked—the real guard is that any send would block and the test would hang.If you want to assert that the “should not be called” callback never ran in a more explicit way, you could either:
- Make the channel buffered and check it after the client is closed, e.g.:
- subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs) + subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 1) ... t.Cleanup(func() { require.Len(t, subscriptionOneArgsCh, 0) })
- Or replace the cleanup with a simple boolean flag set in the callback and assert it remains false.
Otherwise, consider dropping the
require.Lencleanup to avoid a misleading assertion.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
router-tests/modules/start-subscription/module.go(1 hunks)router-tests/modules/start_subscription_test.go(1 hunks)router-tests/modules/stream-publish/module.go(1 hunks)router-tests/modules/stream-receive/module.go(1 hunks)router/pkg/pubsub/datasource/mocks.go(15 hunks)router/pkg/pubsub/pubsub_test.go(8 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- router-tests/modules/stream-publish/module.go
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Applied to files:
router-tests/modules/start_subscription_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
Applied to files:
router-tests/modules/start_subscription_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Applied to files:
router-tests/modules/start_subscription_test.gorouter/pkg/pubsub/pubsub_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Applied to files:
router-tests/modules/start_subscription_test.gorouter/pkg/pubsub/pubsub_test.gorouter/pkg/pubsub/datasource/mocks.go
📚 Learning: 2025-08-20T22:13:25.222Z
Learnt from: StarpTech
Repo: wundergraph/cosmo PR: 2157
File: router-tests/go.mod:16-16
Timestamp: 2025-08-20T22:13:25.222Z
Learning: github.com/mark3labs/mcp-go v0.38.0 has regressions and should not be used in the wundergraph/cosmo project. v0.36.0 is the stable version that should be used across router-tests and other modules.
Applied to files:
router/pkg/pubsub/pubsub_test.go
🧬 Code graph analysis (5)
router-tests/modules/start_subscription_test.go (9)
router-tests/testenv/testenv.go (2)
Run(105-122)Environment(1733-1769)router-tests/modules/start-subscription/module.go (1)
StartSubscriptionModule(13-17)router/core/router.go (3)
Option(172-172)WithModulesConfig(1679-1683)WithCustomModules(1786-1790)router-tests/testenv/utils.go (1)
AwaitChannelWithT(10-19)router/core/subscriptions_modules.go (1)
SubscriptionOnStartHandlerContext(15-31)router/pkg/pubsub/kafka/engine_datasource.go (2)
SubscriptionEventConfiguration(91-95)MutableEvent(62-66)router/pkg/pubsub/datasource/provider.go (1)
SubscriptionEventConfiguration(101-105)router/core/http_graphql_error.go (1)
NewHttpGraphqlError(22-28)router/core/context.go (1)
RequestContext(61-142)
router-tests/modules/start-subscription/module.go (3)
router/core/subscriptions_modules.go (2)
SubscriptionOnStartHandlerContext(15-31)SubscriptionOnStartHandler(180-184)router/core/context.go (1)
RequestContext(61-142)router/core/modules.go (4)
ModuleContext(160-164)Module(52-54)ModuleInfo(44-50)EnginePostOriginHandler(132-136)
router/pkg/pubsub/pubsub_test.go (2)
router/pkg/pubsub/datasource/hooks.go (5)
Hooks(17-21)OnReceiveEventsHooks(34-38)OnReceiveEventsFn(14-14)OnPublishEventsHooks(29-31)OnPublishEventsFn(12-12)router/pkg/metric/noop_stream_metrics.go (1)
NewNoopStreamMetricStore(15-15)
router/pkg/pubsub/datasource/mocks.go (4)
router/pkg/pubsub/kafka/engine_datasource.go (3)
SubscriptionDataSource(209-211)PublishEventConfiguration(188-192)SubscriptionEventConfiguration(91-95)router/pkg/pubsub/redis/engine_datasource.go (3)
SubscriptionDataSource(163-165)PublishEventConfiguration(141-145)SubscriptionEventConfiguration(64-68)router/pkg/pubsub/datasource/datasource.go (2)
SubscriptionDataSource(8-13)EngineDataSourceFactory(23-39)router/pkg/pubsub/datasource/subscription_event_updater.go (1)
SubscriptionEventUpdater(21-26)
router-tests/modules/stream-receive/module.go (3)
router/core/subscriptions_modules.go (2)
StreamReceiveEventHandlerContext(247-263)StreamReceiveEventHandler(265-275)router/pkg/pubsub/datasource/provider.go (1)
StreamEvents(61-63)router/core/modules.go (3)
ModuleContext(160-164)Module(52-54)ModuleInfo(44-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: build-router
- GitHub Check: integration_test (./telemetry)
- GitHub Check: build_push_image (nonroot)
- GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
- GitHub Check: image_scan
- GitHub Check: image_scan (nonroot)
- GitHub Check: build_push_image
- GitHub Check: build_test
- GitHub Check: integration_test (./events)
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
🔇 Additional comments (13)
router-tests/modules/stream-receive/module.go (5)
10-15: LGTM!The module structure is well-defined with appropriate public fields for test flexibility. The
Callbackfield signature correctly matches theOnReceiveEventsmethod signature, enabling test injection.
17-22: LGTM!The
Provisionmethod follows the standard module initialization pattern and correctly captures the logger for later use.
24-34: LGTM!The
OnReceiveEventsimplementation correctly:
- Uses defensive nil checks for optional fields
- Delegates to the callback when provided
- Returns events unchanged when no callback is set
- Properly propagates errors from the callback
The defensive nil check for
Loggeris appropriate for a test module where instances might be created without proper initialization.
36-46: LGTM!The
Modulemethod correctly returns module metadata with appropriate priority for a test module. The constructor pattern is standard and allows tests to configure the module after instantiation.
48-51: LGTM!The interface guard is a best practice that ensures compile-time verification of the
core.StreamReceiveEventHandlerinterface implementation.router-tests/modules/start_subscription_test.go (4)
21-171: Solid coverage of basic hook invocation and Kafka write pathThe first two subtests exercise both the “no-op” StartSubscription hook (log-only) and the Kafka write path end‑to‑end, including subscription lifecycle and log observation. The client.Run goroutine is correctly bounded via
client.Close()andAwaitChannelWithT, which should prevent leaks and flakiness.
360-446: Good verification of HttpGraphqlError propagation to GraphQL clientThe “error is propagated to the client” test does a nice job of asserting that a
NewHttpGraphqlErrorfrom the StartSubscription hook surfaces asgraphql.Errorswith the expectedstatusCodeandcodeextensions. This gives strong coverage of the new error path and its client‑visible shape.
448-584: Engine subscription tests correctly cover StartSubscription for in‑router streamsThe engine subscription tests validate that the StartSubscription hook:
- Runs for engine‑backed subscriptions (without Kafka config), and
- Can inject an initial event via
ctx.NewEvent/ctx.WriteEventthat precedes engine‑produced events.The two
AwaitChannelWithTcalls for{"countEmp": 1000}and then{"countEmp": 0}give clear ordering guarantees, and the client lifecycle is properly closed viaclient.Close()and theclientRunChchannel.
586-661: OnOriginResponse behavior now matches test name and expectationThis test explicitly asserts that when the StartSubscription hook returns an error,
CallbackOnOriginResponseis not invoked by checkingrequire.Empty(t, originResponseCalled)on a buffered channel. That aligns with the test name and resolves the previous mismatch between name and behavior.router-tests/modules/start-subscription/module.go (1)
1-62: StartSubscriptionModule implementation is minimal, safe, and correctly wired
Provisionnow only assignsctx.LoggerandSubscriptionOnStartguards withif m.Logger != nil, resolving the nil‑logger panic risk from earlier review.SubscriptionOnStartandOnOriginResponsecleanly delegate to optional callbacks without side effects when unset.Module()uses the sharedmyModuleIDconstant, sets a reasonable priority, and exposes aNewfactory, and the interface guards ensure compile‑time compatibility withSubscriptionOnStartHandlerandEnginePostOriginHandler.This module is well‑structured for both tests and real integrations.
router/pkg/pubsub/pubsub_test.go (2)
71-71: Past review comment addressed.The
BuildProvidermock expectation now correctly matches the two-argument signature (datasource config and metric store).
305-305: LGTM: Hooks parameter consistently added across all provider integration tests.The
BuildProvidersAndDataSourcessignature has been correctly updated for NATS, Kafka, and Redis integration tests with the hooks parameter.Also applies to: 358-358, 411-411
router/pkg/pubsub/datasource/mocks.go (1)
201-253: LGTM: ResolveDataSourceSubscription return type correctly updated.The generated mock now returns
SubscriptionDataSourceinstead ofresolve.SubscriptionDataSource, matching the updated interface definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
router-tests/modules/stream-publish/module.go (1)
10-10: Consider renaming for consistency with sibling module.The module ID
"publishModule"could follow the same naming pattern as the siblingStreamReceiveModulewhich uses"streamReceiveModule". Consider"streamPublishModule"for consistency.-const myModuleID = "publishModule" +const myModuleID = "streamPublishModule"router-tests/modules/stream-receive/module.go (1)
26-26: Consider a more specific log message for test clarity.The message "Stream Hook has been run" is less specific than the sibling
PublishModule's "Publish Hook has been run". For easier test debugging, consider "Receive Hook has been run".- m.Logger.Info("Stream Hook has been run") + m.Logger.Info("Receive Hook has been run")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
router-tests/modules/stream-publish/module.go(1 hunks)router-tests/modules/stream-receive/module.go(1 hunks)router/pkg/pubsub/pubsub_test.go(8 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Applied to files:
router/pkg/pubsub/pubsub_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Applied to files:
router/pkg/pubsub/pubsub_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
Applied to files:
router/pkg/pubsub/pubsub_test.go
📚 Learning: 2025-08-20T22:13:25.222Z
Learnt from: StarpTech
Repo: wundergraph/cosmo PR: 2157
File: router-tests/go.mod:16-16
Timestamp: 2025-08-20T22:13:25.222Z
Learning: github.com/mark3labs/mcp-go v0.38.0 has regressions and should not be used in the wundergraph/cosmo project. v0.36.0 is the stable version that should be used across router-tests and other modules.
Applied to files:
router/pkg/pubsub/pubsub_test.go
🧬 Code graph analysis (3)
router/pkg/pubsub/pubsub_test.go (2)
router/pkg/pubsub/datasource/hooks.go (5)
Hooks(17-21)OnReceiveEventsHooks(34-38)OnReceiveEventsFn(14-14)OnPublishEventsHooks(29-31)OnPublishEventsFn(12-12)router/pkg/metric/noop_stream_metrics.go (1)
NewNoopStreamMetricStore(15-15)
router-tests/modules/stream-receive/module.go (4)
router/pkg/mcpserver/util.go (1)
Logger(6-9)router/core/subscriptions_modules.go (2)
StreamReceiveEventHandlerContext(247-263)StreamReceiveEventHandler(265-275)router/pkg/pubsub/datasource/provider.go (1)
StreamEvents(61-63)router/core/modules.go (3)
ModuleContext(160-164)Module(52-54)ModuleInfo(44-50)
router-tests/modules/stream-publish/module.go (3)
router/core/subscriptions_modules.go (2)
StreamPublishEventHandlerContext(277-290)StreamPublishEventHandler(292-299)router/pkg/pubsub/datasource/provider.go (1)
StreamEvents(61-63)router/core/modules.go (3)
ModuleContext(160-164)Module(52-54)ModuleInfo(44-50)
🔇 Additional comments (3)
router-tests/modules/stream-publish/module.go (1)
1-51: LGTM! Past issues resolved, module implementation is solid.The nil logger guard and overall structure look good. The module correctly implements the
StreamPublishEventHandlerinterface with appropriate safety checks.router-tests/modules/stream-receive/module.go (1)
1-51: LGTM! Package naming corrected, implementation is sound.The module correctly implements the
StreamReceiveEventHandlerinterface with proper safety checks. Past package name issue has been resolved.router/pkg/pubsub/pubsub_test.go (1)
6-6: LGTM! Tests correctly updated for hooks integration.The test changes properly wire the new
datasource.Hooksparameter through allbuild()andBuildProvidersAndDataSources()calls. TheSetHooksmock expectations correctly omit.Return()sinceSetHooksis a void method. Passing emptydatasource.Hooks{}in all tests is appropriate for unit tests that focus on the build plumbing rather than hook behavior.Also applies to: 65-68, 75-75, 131-131, 186-186, 244-247, 254-254, 305-305, 358-358, 411-411
|
|
||
| }, "unable to close client before timeout") | ||
|
|
||
| requestLog := xEnv.Observer().FilterMessage("SubscriptionOnStart Hook has been run") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about using a mock implementation that increases a counter and assert on the number of hook calls instead of filtering on a message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, using a counter on the custom module now, which increases whenever the hook is called.
Commit: chore: use counter to count hook calls
| } | ||
| ctx.WriteEvent((&kafka.MutableEvent{ | ||
| Key: []byte("1"), | ||
| Data: []byte(`{"id": 1, "__typename": "Employee"}`), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we access the id from the ctx here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you specify what id you mean? The one from Data? Unfortunately no, thats not possible at the moment
| "startSubscriptionModule": start_subscription.StartSubscriptionModule{ | ||
| Callback: func(ctx core.SubscriptionOnStartHandlerContext) error { | ||
| callbackCalled <- true | ||
| return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the intention here to gracefully stop a subscription or is this just returning a generic error? If yes, could we find a more idiomatic way to close a subscription?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention is to give hook developers the option to abort the subscription during initialization phase. So suppose return from the hook like this
return core.NewHttpGraphqlError("Forbidden", http.StatusText(http.StatusForbidden), http.StatusForbidden)On the connection, this is what you get
> {"type":"connection_init","payload":{}}
< {"type":"connection_ack"}
> {"id":"1","type":"subscribe","payload":{"query":"subscription { employeeUpdatedMyKafka(employeeID:1) { id details { forename surname } } }"}}
< {"id":"1","type":"error","payload":[{"message":"Forbidden","extensions":{"statusCode":403,"code":"Forbidden"}}]}
Disconnected (code: 1000, reason: "Normal closure")
So first an error message is returned after a subscription start (not at connection_init), than the websockt connection is closed normally by the router. Do you have any specific ideas in mind on how to close the connection instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we return a special error?
return core.CloseSubscription{
Reason: "my custom message"
}You can then do errors.As/errors.Is to react.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussion with Jens, I now have added a dedicated type for this: core.StreamHandlerError.
Commit: chore: reintroduce StreamHandlerError
adr/cosmo-streams-v1.md
Outdated
| SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration | ||
| // WriteEvent writes an event to the stream of the current subscription | ||
| // It returns true if the event was written to the stream, false if the event was dropped | ||
| WriteEvent(event datasource.StreamEvent) bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have much stronger naming differentiation between WriteEvent and NewEvent.
My understanding from reading the tests is that WriteEvent goes through the event broker (e.g. Kafka) so it's fan out to all consumers vs. NewEvent is just local to the Subscription but I'm not entirely sure.
I've searched through the ADR and it's not clear what the differentiation is.
Can you clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see the confusion.
WriteEvent creates an event inside the router. For the router it looks like an event coming straight from a broker. It resolves the fields asked for by the operation and returns the results to the client. The method expects an event object, which you can create with NewEvent.
I will update the godoc and ADR to make this more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, WriteEvent is not really writing an event to the broker, which I thought it would, but instead it's just emitting a new event, but not to other Routers through the broker, only to the current Router? Can we make it clear that this event is only for the current subscription? or is it for all Router local subscriptions? and could it also be delivered back to the broker to fan it out? Maybe we want to offer the user options. In any way, we want to be very explicit in regards to naming such that the side effects are very clear. "WriteEvent" is very generic and can mean a lot of things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, WriteEvent is not really writing an event to the broker, which I thought it would, but instead it's just emitting a new event, but not to other Routers through the broker, only to the current Router?
Yes, it's only emitting an event inside the router. No broker will ever receive that message.
Can we make it clear that this event is only for the current subscription? or is it for all Router local subscriptions?
The event is local to the subscription the hook is being called for. Other subscriptions will never see this event. I will address this in godoc and Cosmo docs.
and could it also be delivered back to the broker to fan it out? Maybe we want to offer the user options.
It's a good idea but at the moment we don't support this. Right now the main goal of the WriteEvent method was to provide hook developers a way to push initial data to new subscription clients, so they can build up state without waiting for an actual event to arrive from the broker.
But I do see how it can be valuable to push events to brokers when a new client connects. Not sure how complicated it is to add but I believe it's not something we can easily add to this hook. Probably involves some more work. Will look into it to see if we could easily add this.
Overall I agree the method sounds too vague and doesn't really tell what it's actually doing. Gonna rename the method and clarify in godoc and Cosmo Streams what's actually happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed the method to EmitLocalEvent with a more detailed godoc.
Commit: chore: rename WriteEvent to EmitLocalEvent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should update the ADR, right?
The core.NewHttpGraphqlError() method returns an error, which does not fit the OnPublishEvent hook very well. Instead we return a generic error, which results in the same thing. The error handling does not behave differently because of it.
If this hook returns an error it is not wanted that the error is logged. Other Cosmo Streams hooks don't do it. The reason is we want the hook developer to decide what log level to use for errors, or if he doesn't want to log an error at all. This error log handling is already in place for the other two hooks but somehow was forgotten on this one.
This makes the OnPublishEvent hook be more consistent in nil event filtering with the OnReceiveEvent hook.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (2)
router/pkg/pubsub/datasource/pubsubprovider.go (1)
21-53: Consider adding observability for hook failures and nil event cleanup.The panic recovery and hook processing would benefit from additional context:
Hook identification in panic recovery: When a hook panics, the stack trace doesn't identify which specific hook failed, making debugging harder in production when multiple hooks are registered.
Silent nil event removal: Lines 48-50 silently filter nil events. If a hook incorrectly returns nil events, this will mask the bug without any diagnostic signal.
Hook error logging: When a hook returns an error (line 43), the loop breaks but the error isn't logged here, losing context about which hook in the chain failed.
Consider these improvements:
func (p *PubSubProvider) applyPublishEventHooks(ctx context.Context, cfg PublishEventConfiguration, events []StreamEvent) (currentEvents []StreamEvent, err error) { defer func() { if r := recover(); r != nil { p.Logger. WithOptions(zap.AddStacktrace(zapcore.ErrorLevel)). - Error("[Recovery from handler panic]", zap.Any("error", r)) + Error("[Recovery from publish hook panic]", + zap.Any("error", r), + zap.String("provider_id", cfg.ProviderID()), + zap.String("root_field", cfg.RootFieldName())) switch v := r.(type) { case error: err = v default: err = fmt.Errorf("%v", r) } } }() currentEvents = events - for _, hook := range p.hooks.OnPublishEvents.Handlers { + for i, hook := range p.hooks.OnPublishEvents.Handlers { currentEvents, err = hook(ctx, cfg, currentEvents, p.eventBuilder) if err != nil { + p.Logger.Error("publish hook returned error", + zap.Int("hook_index", i), + zap.Error(err), + zap.String("provider_id", cfg.ProviderID())) break } } + nilCount := 0 currentEvents = slices.DeleteFunc(currentEvents, func(event StreamEvent) bool { - return event == nil + isNil := event == nil + if isNil { + nilCount++ + } + return isNil }) + if nilCount > 0 { + p.Logger.Warn("removed nil events after hook processing", + zap.Int("nil_count", nilCount), + zap.String("provider_id", cfg.ProviderID())) + } return currentEvents, err }router-tests/modules/stream_receive_test.go (1)
695-727: Consider adding a small sleep in the busy loop.The loop at lines 695-727 doesn't yield, which could waste CPU cycles. While this is acceptable in tests and the loop should exit quickly, consider adding a small
time.Sleep(time.Millisecond)if CI performance becomes a concern.for { current := currentHandlers.Load() max := maxCurrentHandlers.Load() if current > max { maxCurrentHandlers.CompareAndSwap(max, current) } if current >= int32(tc.maxConcurrent) { // wait to see if the subscription-updater spawns too many concurrent hooks, // i.e. exceeding the number of configured max concurrent hooks. deadline := time.Now().Add(300 * time.Millisecond) for time.Now().Before(deadline) { if currentHandlers.Load() > int32(tc.maxConcurrent) { break } + time.Sleep(time.Millisecond) } break } // Let hooks continue if we never reach a updater batch size = tc.maxConcurrent // because there are not enough remaining clients to be updated. // i.e. it could be the last round of updates: // 100 clients, now in comes a new event from broker, max concurrent hooks = 30. // First round: 30 hooks run, 70 remaining. // Second round: 30 hooks run, 40 remaining. // Third round: 30 hooks run, 10 remaining. // Fourth round: 10 hooks run, then we end up here because remainingSubs < tc.maxConcurrent. remainingSubs := tc.numSubscribers - int(finishedHandlers.Load()) if remainingSubs < tc.maxConcurrent { break } + time.Sleep(time.Millisecond) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
router-tests/modules/start_subscription_test.go(1 hunks)router-tests/modules/stream_publish_test.go(1 hunks)router-tests/modules/stream_receive_test.go(1 hunks)router/pkg/pubsub/datasource/pubsubprovider.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- router-tests/modules/stream_publish_test.go
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Applied to files:
router-tests/modules/stream_receive_test.gorouter-tests/modules/start_subscription_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Applied to files:
router-tests/modules/stream_receive_test.gorouter-tests/modules/start_subscription_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
Applied to files:
router-tests/modules/stream_receive_test.gorouter-tests/modules/start_subscription_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Applied to files:
router-tests/modules/stream_receive_test.gorouter-tests/modules/start_subscription_test.gorouter/pkg/pubsub/datasource/pubsubprovider.go
🧬 Code graph analysis (3)
router-tests/modules/stream_receive_test.go (11)
router-tests/testenv/testenv.go (2)
Run(105-122)Environment(1733-1769)router/pkg/config/config.go (4)
Config(1017-1092)Graph(24-29)SubscriptionHooksConfiguration(648-650)OnReceiveEventsConfiguration(652-655)router-tests/modules/stream-receive/module.go (1)
StreamReceiveModule(12-15)router/core/router.go (5)
Option(172-172)WithModulesConfig(1679-1683)WithCustomModules(1786-1790)WithAccessController(1839-1843)WithSubscriptionHooks(2143-2148)router-tests/events/utils.go (2)
KafkaEnsureTopicExists(15-32)ProduceKafkaMessage(34-53)router-tests/testenv/utils.go (1)
AwaitChannelWithT(10-19)router/core/subscriptions_modules.go (1)
StreamReceiveEventHandlerContext(247-263)router/pkg/pubsub/datasource/provider.go (4)
StreamEvents(61-63)StreamEvent(86-91)NewStreamEvents(81-83)MutableStreamEvent(94-98)router/pkg/authentication/jwks_token_decoder.go (3)
NewJwksTokenDecoder(78-241)JWKSConfig(44-56)TokenDecoder(21-23)router/pkg/authentication/http_header_authenticator.go (2)
HttpHeaderAuthenticatorOptions(81-89)NewHttpHeaderAuthenticator(93-113)router/core/access_controller.go (2)
NewAccessController(37-44)AccessControllerOptions(20-25)
router-tests/modules/start_subscription_test.go (7)
router-tests/testenv/testenv.go (3)
Run(105-122)LogObservationConfig(388-391)Environment(1733-1769)router-tests/modules/start-subscription/module.go (1)
StartSubscriptionModule(13-17)router/core/router.go (3)
Option(172-172)WithModulesConfig(1679-1683)WithCustomModules(1786-1790)router-tests/testenv/utils.go (1)
AwaitChannelWithT(10-19)router/core/subscriptions_modules.go (1)
SubscriptionOnStartHandlerContext(15-31)router/pkg/pubsub/kafka/engine_datasource.go (2)
SubscriptionEventConfiguration(91-95)MutableEvent(62-66)router/core/http_graphql_error.go (1)
NewHttpGraphqlError(22-28)
router/pkg/pubsub/datasource/pubsubprovider.go (4)
router/pkg/pubsub/datasource/provider.go (4)
Adapter(24-28)PublishEventConfiguration(108-112)StreamEvent(86-91)SubscriptionEventConfiguration(101-105)router/pkg/pubsub/datasource/hooks.go (1)
Hooks(17-21)router/pkg/pubsub/datasource/subscription_datasource.go (1)
EventBuilderFn(16-16)router/pkg/pubsub/datasource/subscription_event_updater.go (1)
SubscriptionEventUpdater(21-26)
🔇 Additional comments (13)
router/pkg/pubsub/datasource/pubsubprovider.go (4)
81-94: Verify error handling semantics when hooks fail.The current implementation publishes events even when
applyPublishEventHooksreturns an error:processedEvents, hooksErr := p.applyPublishEventHooks(ctx, cfg, events) errPublish := p.Adapter.Publish(ctx, cfg, processedEvents) if errPublish != nil { return errPublish } return hooksErrThis means:
- If hooks fail but publish succeeds, the hook error is returned to the caller after events have already been published
- If hooks panic or return an error, publishing still proceeds with whatever events the hooks produced before failing
Is this the intended behavior? Typically, middleware hooks either:
- Gate the operation: Hook failure prevents publishing entirely
- Observe only: Hook failure is logged but doesn't affect the return value
The current hybrid approach (publish then return hook error) could lead to confusion about whether the publish succeeded.
Consider clarifying the intended semantics:
Option 1: Hooks gate publishing
processedEvents, hooksErr := p.applyPublishEventHooks(ctx, cfg, events) if hooksErr != nil { return hooksErr // Don't publish if hooks fail } return p.Adapter.Publish(ctx, cfg, processedEvents)Option 2: Hooks are observability-only
processedEvents, hooksErr := p.applyPublishEventHooks(ctx, cfg, events) if hooksErr != nil { p.Logger.Error("publish hooks failed but continuing", zap.Error(hooksErr)) } return p.Adapter.Publish(ctx, cfg, processedEvents) // Only return publish errors
77-79: LGTM!Clean delegation to the adapter. Hook processing for subscriptions is correctly handled at the updater layer rather than in this provider wrapper.
96-98: LGTM!Simple hook configuration method that correctly follows the design pattern where hooks can be set independently of other configuration.
Based on learnings
100-112: LGTM - Previous nil logger concern addressed.The constructor correctly normalizes
nillogger tozap.NewNop(), which prevents panics inapplyPublishEventHooksand other logging paths. All fields including the neweventBuilderare properly initialized.router-tests/modules/stream_receive_test.go (8)
1-36: LGTM!Package structure and imports are well-organized for comprehensive integration testing of the StreamReceiveModule with Kafka.
37-112: LGTM!Test properly verifies hook invocation with appropriate cleanup and timeout handling.
114-201: LGTM!Test correctly demonstrates the clone-then-mutate pattern for event transformation and verifies the results.
298-460: LGTM!Comprehensive test of authentication-based event transformation with proper setup, assertions, and cleanup for multiple clients.
462-562: LGTM!Test correctly verifies custom header access in hooks with proper use of canonical header keys.
564-640: LGTM!Test properly verifies that hook errors trigger graceful subscription closure and cleanup.
736-825: LGTM!Comprehensive concurrency test with proper setup, multiple clients, and verification that concurrent handler limits are respected.
827-951: LGTM!Test correctly verifies that hooks exceeding the timeout don't block subsequent events, resulting in out-of-order delivery. The logic and assertions properly demonstrate the timeout mechanism.
router-tests/modules/start_subscription_test.go (1)
21-612: Solid end-to-end coverage ofSubscriptionOnStartand engine flowsThe suite thoroughly exercises: basic hook invocation, Kafka/stream
WriteEventbehavior, per-subscription targeting, error propagation (including HTTP GraphQL error metadata), engine-based subscriptions, and interaction withOnOriginResponse, all viatestenv.Runand the websocket client. This gives strong regression coverage for the new Cosmo Streams v1 start-subscription hook wiring.
| callbackCalled := make(chan bool) | ||
|
|
||
| cfg := config.Config{ | ||
| Graph: config.Graph{}, | ||
| Modules: map[string]interface{}{ | ||
| "startSubscriptionModule": start_subscription.StartSubscriptionModule{ | ||
| Callback: func(ctx core.SubscriptionOnStartHandlerContext) error { | ||
| callbackCalled <- true | ||
| return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK) | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| testenv.Run(t, &testenv.Config{ | ||
| RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, | ||
| EnableKafka: true, | ||
| RouterOptions: []core.Option{ | ||
| core.WithModulesConfig(cfg.Modules), | ||
| core.WithCustomModules(&start_subscription.StartSubscriptionModule{}), | ||
| }, | ||
| LogObservation: testenv.LogObservationConfig{ | ||
| Enabled: true, | ||
| LogLevel: zapcore.InfoLevel, | ||
| }, | ||
| }, func(t *testing.T, xEnv *testenv.Environment) { | ||
| var subscriptionOne struct { | ||
| employeeUpdatedMyKafka struct { | ||
| ID float64 `graphql:"id"` | ||
| Details struct { | ||
| Forename string `graphql:"forename"` | ||
| Surname string `graphql:"surname"` | ||
| } `graphql:"details"` | ||
| } `graphql:"employeeUpdatedMyKafka(employeeID: $employeeID)"` | ||
| } | ||
|
|
||
| surl := xEnv.GraphQLWebSocketSubscriptionURL() | ||
| client := graphql.NewSubscriptionClient(surl) | ||
|
|
||
| vars := map[string]interface{}{ | ||
| "employeeID": 3, | ||
| } | ||
| type kafkaSubscriptionArgs struct { | ||
| dataValue []byte | ||
| errValue error | ||
| } | ||
| subscriptionArgsCh := make(chan kafkaSubscriptionArgs, 1) | ||
| subscriptionOneID, err := client.Subscribe(&subscriptionOne, vars, func(dataValue []byte, errValue error) error { | ||
| subscriptionArgsCh <- kafkaSubscriptionArgs{ | ||
| dataValue: dataValue, | ||
| errValue: errValue, | ||
| } | ||
| return nil | ||
| }) | ||
| require.NoError(t, err) | ||
| require.NotEmpty(t, subscriptionOneID) | ||
|
|
||
| clientRunCh := make(chan error) | ||
| go func() { | ||
| clientRunCh <- client.Run() | ||
| }() | ||
|
|
||
| xEnv.WaitForSubscriptionCount(1, time.Second*10) | ||
| <-callbackCalled | ||
| xEnv.WaitForSubscriptionCount(0, time.Second*10) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Unbuffered callbackCalled can deadlock the router goroutine running the hook
Here the hook does:
callbackCalled := make(chan bool)
Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
callbackCalled <- true
return core.NewHttpGraphqlError(...)
}
...
xEnv.WaitForSubscriptionCount(1, time.Second*10)
<-callbackCalledIf the router invokes Callback before the test goroutine reaches <-callbackCalled, the send on the unbuffered channel can block the router goroutine. Depending on the internal ordering, this can prevent the subscription from ever fully registering, so WaitForSubscriptionCount(1, ...) may never succeed, leading to a hang.
Use a buffered channel (and ideally a timeout-based wait) so the hook is never blocked by the test’s receive. For example:
- callbackCalled := make(chan bool)
+ callbackCalled := make(chan bool, 1)
...
- Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
- callbackCalled <- true
- return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK)
- },
+ Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
+ callbackCalled <- true
+ return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK)
+ },
...
- xEnv.WaitForSubscriptionCount(1, time.Second*10)
- <-callbackCalled
+ xEnv.WaitForSubscriptionCount(1, time.Second*10)
+ testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, func(t *testing.T, called bool) {
+ require.True(t, called)
+ }, "StartSubscription callback was not invoked")
xEnv.WaitForSubscriptionCount(0, time.Second*10)This keeps the assertion while removing the risk that the hook blocks the router.
I'll verify the review comment by examining the actual code and checking for the suggested helper function.
---
🏁 Script executed:
```shell
#!/bin/bash
# Locate and read the test file to verify the code snippet
fd -t f "start_subscription_test.go" --path "*router-tests*" --exec cat {}
Length of output: 233
🏁 Script executed:
#!/bin/bash
# Search for the AwaitChannelWithT helper function
rg -n "AwaitChannelWithT" --type goLength of output: 18185
🏁 Script executed:
#!/bin/bash
# Search for callback-related patterns in subscription code
rg -n "OnStartHandlerContext\|SubscriptionOnStart" --type go -C2Length of output: 0
🏁 Script executed:
#!/bin/bash
# Check for similar channel patterns in other tests
rg -n "callbackCalled|make\(chan.*bool\)" --type go router-tests/ -A2 -B2Length of output: 1695
🏁 Script executed:
#!/bin/bash
# Look for testenv helper functions to understand available utilities
rg -n "func.*Await\|func.*Wait" --type go testenv/ -A1Length of output: 48
---
🏁 Script executed:
```shell
#!/bin/bash
# Get the full AwaitChannelWithT implementation
rg -n "func AwaitChannelWithT" --type go -A10
Length of output: 654
🏁 Script executed:
#!/bin/bash
# Get exact context around the problematic code in start_subscription_test.go lines 188-253
rg -n "callbackCalled := make" --type go -B5 -A65Length of output: 6006
🏁 Script executed:
#!/bin/bash
# Look for subscription lifecycle/callback execution in core module
fd -type f "*.go" -path "*/core/*" --exec grep -l "SubscriptionOnStartHandlerContext\|Callback" {} \;Length of output: 170
🏁 Script executed:
#!/bin/bash
# Check testenv helpers location
find . -type f -name "utils.go" -exec grep -l "AwaitChannelWithT" {} \;Length of output: 62
🏁 Script executed:
#!/bin/bash
# Get core interface definition for the callback
rg -n "SubscriptionOnStartHandlerContext" --type go -B2 -A2Length of output: 6262
Use a buffered channel and timeout-protected receive for callbackCalled
The unbuffered channel at line 188 can deadlock the router goroutine if the callback executes before the test reaches the blocking receive at line 251. Since the callback is invoked by the router during subscription setup, and the test flow involves asynchronous goroutines, the timing is non-deterministic.
Change line 188 to buffer the channel: callbackCalled := make(chan bool, 1)
Replace the blocking receive at line 251 with the timeout-protected helper already used throughout the codebase:
testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, func(t *testing.T, called bool) {
require.True(t, called)
}, "StartSubscription callback was not invoked")This prevents the router goroutine from blocking and aligns with the pattern used in 60+ other tests in the repository.
🤖 Prompt for AI Agents
In router-tests/modules/start_subscription_test.go around lines 188 to 253, the
test creates an unbuffered channel callbackCalled which can deadlock if the
router invokes the callback before the test's blocking receive; change the
channel creation to a buffered channel (capacity 1) and replace the direct
blocking receive with the repo's timeout-protected helper
testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, ...) to assert the
callback was invoked, using the provided failure message "StartSubscription
callback was not invoked".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check the current state of the callbackCalled channel and its usage
rg -n "callbackCalled" --type go router-tests/modules/start_subscription_test.go -B2 -A5
Length of output: 882
@dkorittki, thank you for addressing part of the issue! I can see you've correctly replaced the blocking receive with testenv.AwaitChannelWithT at line 260, which is great. However, the channel at line 194 is still unbuffered:
callbackCalled := make(chan bool) // Still unbufferedThis needs to be:
callbackCalled := make(chan bool, 1) // Buffered with capacity 1Without the buffer, the router goroutine can still deadlock when it executes the callback at line 199 (callbackCalled <- true) if the test hasn't yet reached the AwaitChannelWithT call. The buffer ensures the send never blocks, regardless of timing.
Could you please add the buffer capacity to complete the fix?
🧠 Learnings used
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using an unbuffered channel is done on purpose. It ensures the hook, who writes to the channel is blocked until the test itself reads from the channel (line 260). We need this blocking so we can measure in line 259 the client connected and in line 263 that the client got disconnected due to the error being returned by the hook.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find where SubscriptionOnStart hook is invoked to understand the timing
rg -n "SubscriptionOnStart|OnSubscriptionStart" --type go -B3 -A5 router/
Length of output: 35521
🏁 Script executed:
#!/bin/bash
# Look for subscription registration and count increment logic
rg -n "WaitForSubscriptionCount|SubscriptionCount" --type go -B2 -A2 router-tests/testenv/
Length of output: 295
🏁 Script executed:
#!/bin/bash
# Check the subscription lifecycle in the core to understand when hooks are called
ast-grep --pattern 'func $_($$) {
$$$
SubscriptionOnStart($$$)
$$$
}'
Length of output: 0
@dkorittki, thank you for the clarification! I understand your reasoning now—you're using the blocking behavior to ensure the hook has executed before verifying the disconnection.
Your approach assumes a specific timing: that the subscription count increments to 1 before the SubscriptionOnStart hook completes. If this assumption holds, your test sequence works correctly:
- Subscription registers → count becomes 1
- Hook blocks on
callbackCalled <- true - Line 259:
WaitForSubscriptionCount(1)succeeds - Line 260:
AwaitChannelWithTreads from channel → hook unblocks and returns error - Line 263:
WaitForSubscriptionCount(0)verifies closure
Since you've already added AwaitChannelWithT at line 260, the test has timeout protection—if the timing assumption is violated, the test fails rather than hanging indefinitely, which is acceptable.
However, this timing dependency is implicit and could be fragile if the subscription lifecycle implementation changes. For robustness, I'd suggest one of two approaches:
-
Keep unbuffered + add a comment documenting the timing assumption:
// Unbuffered channel ensures hook blocks until we verify subscription is registered // This synchronization depends on subscription count incrementing before SubscriptionOnStart completes callbackCalled := make(chan bool)
-
Use buffered channel for explicit ordering without timing dependency:
callbackCalled := make(chan bool, 1) // Hook never blocks xEnv.WaitForSubscriptionCount(1, time.Second*10) // Verify subscription registered testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, ...) // Verify hook was called xEnv.WaitForSubscriptionCount(0, time.Second*10) // Verify closure
Both are valid—option 1 preserves your synchronization intent, while option 2 is more defensive against implementation changes. Your call based on whether you want to strictly enforce the ordering or just verify it happened.
🧠 Learnings used
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.
core.NewHttpGraphqlError() is not really the best way to describe
an error in hooks processing. We have to set http specific codes
and status texts, which does not really fit well with subscriptions.
I created a new error for this called StreamHookError, which lets you
pass a message. It can be returned from a hook:
return core.StreamHandlerError{Message: "my hook error"}
and it gets sent to the subscription client
{"id":"1","type":"error","payload":[{"message":"my hook error"}]}
Afterwards we close the connection. This behaviour remains unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
router-tests/modules/start_subscription_test.go (2)
188-251: UnbufferedcallbackCalledchannel still creates deadlock riskThe unbuffered channel at line 188 can deadlock if the hook callback executes before the test's blocking receive at line 251. The hook runs during subscription setup (before or during
WaitForSubscriptionCount), and the timing makes it likely the send at line 195 occurs before line 251, blocking the router goroutine indefinitely.Buffer the channel and use timeout-protected receive:
-callbackCalled := make(chan bool) +callbackCalled := make(chan bool, 1)At line 251, replace the blocking receive:
xEnv.WaitForSubscriptionCount(1, time.Second*10) -<-callbackCalled +testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, func(t *testing.T, called bool) { + require.True(t, called) +}, "StartSubscription callback was not invoked") xEnv.WaitForSubscriptionCount(0, time.Second*10)
328-371: UnbufferedsubscriptionOneArgsChcreates deadlock risk and ineffective assertionLine 328 creates an unbuffered channel but the test never receives from it (only
subscriptionTwoArgsChis consumed at line 357). If the subscription callback at lines 330-334 executes unexpectedly, the send will block forever. The cleanup assertion at line 371 usingrequire.Len(..., 0)is meaningless becauselen()on an unbuffered channel always returns 0—it cannot detect whether the callback was invoked.Buffer the channel so unexpected sends are detectable:
-subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs) +subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 1)The cleanup assertion at line 371 becomes meaningful with the buffer:
t.Cleanup(func() { - require.Len(t, subscriptionOneArgsCh, 0) + require.Len(t, subscriptionOneArgsCh, 0, "unexpected event delivered to non-target subscription") })
🧹 Nitpick comments (5)
adr/cosmo-streams-v1.md (4)
24-25: Fix wordiness and grammar in hook descriptions.Line 24 uses "prior to" which is unnecessarily wordy; line 25 uses "going to" when "will" is more appropriate for certain actions.
Apply this diff:
- StreamReceiveEventHandler: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery. - StreamPublishEventHandler: Called each time a batch of events is going to be sent to the provider. + StreamReceiveEventHandler: Triggered for each client/subscription when a batch of events is received from the provider, before delivery. + StreamPublishEventHandler: Called each time a batch of events will be sent to the provider.
179-179: Fix compound adjective hyphenation and text inconsistency.Two instances of missing hyphens in compound adjectives:
- Line 179: "Federation compatible Router events" should be "Federation-compatible Router events"
- Line 185: "backwards compatible way" should be "backward-compatible way" (also corrects spelling; "backward" is standard for "backward-compatible")
Apply this diff:
- Data mapping: Transforming events data from the format that could be used by the external system to/from Federation compatible Router events + Data mapping: Transforming events data from the format that could be used by the external system to/from Federation-compatible Router events - **Event filtering**: Filtering events using custom logic - **Event creation**: Creating new events from scratch using `ctx.NewEvent(data)` method available in all handler contexts ## Backwards Compatibility - The new hooks can be integrated in the router in a fully backwards compatible way. + The new hooks can be integrated in the router in a fully backward-compatible way.Also applies to: 185-185
195-195: Fix Markdown heading and emphasis style.Line 195 uses emphasis (
__All examples...__) instead of proper Markdown heading syntax. Additionally, the strong style uses underscores instead of asterisks, which is inconsistent with the rest of the document.Apply this diff:
-__All examples reflect the current implementation and match the actual API__ +## All examples reflect the current implementation and match the actual API
249-252: Replace hard tabs with spaces in code examples.The code examples contain hard tabs (indentation at lines 249, 250, 251, 252, 254, 388, 389, 390, 391, 397, 398, 405, 406) that should be replaced with spaces for consistency with the rest of the Markdown document and to ensure proper rendering across different editors and platforms.
Ensure all indentation in the code blocks uses spaces (typically 4 or 8 per indent level) instead of tab characters. This is flagged by
markdownlintruleMD010(no-hard-tabs).Also applies to: 254-254, 388-391, 397-398, 405-406
router-tests/modules/start_subscription_test.go (1)
426-461: Unbuffered channel with ineffective cleanup assertionThe unbuffered channel at line 426 makes the cleanup assertion at line 461 meaningless. After receiving the expected error message at lines 446-450,
require.Len(t, subscriptionOneArgsCh, 0)always passes becauselen()on an unbuffered channel always returns 0. The assertion cannot detect if additional unexpected messages were sent.Buffer the channel to make the cleanup assertion meaningful:
-subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs) +subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 2)The cleanup check at line 461 will now properly detect unexpected additional messages.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
adr/cosmo-streams-v1.md(1 hunks)router-tests/modules/start_subscription_test.go(1 hunks)router/core/errors.go(3 hunks)router/core/graphql_handler.go(2 hunks)router/core/subscriptions_modules.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- router/core/errors.go
- router/core/subscriptions_modules.go
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Applied to files:
router-tests/modules/start_subscription_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
Applied to files:
router-tests/modules/start_subscription_test.goadr/cosmo-streams-v1.md
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Applied to files:
router-tests/modules/start_subscription_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Applied to files:
router-tests/modules/start_subscription_test.go
📚 Learning: 2025-11-16T11:52:34.064Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/pkg/pubsub/redis/engine_datasource.go:37-42
Timestamp: 2025-11-16T11:52:34.064Z
Learning: In router/pkg/pubsub/redis/engine_datasource.go (and similar files for kafka/nats), the MutableEvent.GetData() method intentionally returns the internal Data slice without cloning, while Event.GetData() clones the slice. This is by design: MutableEvent is designed to be mutable (so callers can modify the data), while Event is immutable (cloning prevents modification). This difference is part of the Cosmo Streams v1 hook interface design.
Applied to files:
adr/cosmo-streams-v1.md
🧬 Code graph analysis (2)
router-tests/modules/start_subscription_test.go (7)
router-tests/testenv/testenv.go (3)
Run(105-122)LogObservationConfig(388-391)Environment(1733-1769)router-tests/modules/start-subscription/module.go (1)
StartSubscriptionModule(13-17)router/core/router.go (3)
Option(172-172)WithModulesConfig(1679-1683)WithCustomModules(1786-1790)router-tests/testenv/utils.go (1)
AwaitChannelWithT(10-19)router/core/subscriptions_modules.go (2)
SubscriptionOnStartHandlerContext(15-53)StreamHandlerError(448-451)router/pkg/pubsub/kafka/engine_datasource.go (2)
SubscriptionEventConfiguration(91-95)MutableEvent(62-66)router/pkg/pubsub/datasource/provider.go (1)
SubscriptionEventConfiguration(101-105)
router/core/graphql_handler.go (2)
router/core/subscriptions_modules.go (1)
StreamHandlerError(448-451)router/core/errors.go (1)
Extensions(49-55)
🪛 LanguageTool
adr/cosmo-streams-v1.md
[style] ~24-~24: ‘prior to’ might be wordy. Consider a shorter alternative.
Context: ...f events is received from the provider, prior to delivery. - StreamPublishEventHandler...
(EN_WORDINESS_PREMIUM_PRIOR_TO)
[style] ~25-~25: Use ‘will’ instead of ‘going to’ if the following action is certain.
Context: ...er`: Called each time a batch of events is going to be sent to the provider. ```go // STRU...
(GOING_TO_WILL)
[grammar] ~179-~179: Use a hyphen to join words.
Context: ...y the external system to/from Federation compatible Router events - **Event filte...
(QB_NEW_EN_HYPHEN)
[uncategorized] ~185-~185: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ... be integrated in the router in a fully backwards compatible way. When the new module system will b...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
🪛 markdownlint-cli2 (0.18.1)
adr/cosmo-streams-v1.md
195-195: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
195-195: Strong style
Expected: asterisk; Actual: underscore
(MD050, strong-style)
195-195: Strong style
Expected: asterisk; Actual: underscore
(MD050, strong-style)
249-249: Hard tabs
Column: 1
(MD010, no-hard-tabs)
250-250: Hard tabs
Column: 1
(MD010, no-hard-tabs)
251-251: Hard tabs
Column: 1
(MD010, no-hard-tabs)
252-252: Hard tabs
Column: 1
(MD010, no-hard-tabs)
254-254: Hard tabs
Column: 1
(MD010, no-hard-tabs)
388-388: Hard tabs
Column: 1
(MD010, no-hard-tabs)
389-389: Hard tabs
Column: 1
(MD010, no-hard-tabs)
390-390: Hard tabs
Column: 1
(MD010, no-hard-tabs)
391-391: Hard tabs
Column: 1
(MD010, no-hard-tabs)
397-397: Hard tabs
Column: 1
(MD010, no-hard-tabs)
397-397: Hard tabs
Column: 6
(MD010, no-hard-tabs)
398-398: Hard tabs
Column: 1
(MD010, no-hard-tabs)
405-405: Hard tabs
Column: 1
(MD010, no-hard-tabs)
405-405: Hard tabs
Column: 6
(MD010, no-hard-tabs)
406-406: Hard tabs
Column: 1
(MD010, no-hard-tabs)
🔇 Additional comments (2)
adr/cosmo-streams-v1.md (1)
64-68: Clarify MutableStreamEvent.GetData() behavior relative to cloning.The ADR states that
StreamEvent.GetData()returns a copy (line 58 & 167), but doesn't explicitly document whetherMutableStreamEvent.GetData()inherits this cloning behavior or behaves differently. Based on the design rationale in section "Immutable vs Mutable events," clarify this behavior to prevent confusion for module developers implementing these interfaces.Consider adding a note like:
// MutableStreamEvent is a StreamEvent that can be modified. type MutableStreamEvent interface { StreamEvent // SetData sets the payload data for this event SetData([]byte) // Note: GetData() is inherited from StreamEvent and returns a copy, // or specify actual behavior if it differs (e.g., returns internal slice for performance). }If
MutableStreamEvent.GetData()intentionally returns an uncloned internal slice for performance (as indicated by implementation notes), the ADR should explicitly document this design choice alongside the rationale in section "Immutable vs Mutable events."router/core/graphql_handler.go (1)
393-432: All error type definitions and methods are correctly implemented.Verification confirms:
httpGraphqlErrortype has all required methods:Message(),ExtensionCode(), andStatusCode()at router/core/http_graphql_error.go:34-42- Error type constants
errorTypeEDFSHookErroranderrorTypeHttpErrorare properly defined in router/core/errors.go:38-39- Error classification logic in router/core/errors.go correctly maps
StreamHandlerError→errorTypeEDFSHookError(line 83-84) andhttpGraphqlError→errorTypeHttpError(line 99-100)- Tests exercise
StreamHandlerErrorpaths in router-tests/modules/start_subscription_test.goHowever, I could not find explicit tests for the cast failure defensive paths at lines 395 and 419. While the classification logic ensures casts should succeed in normal operation, verify that your test suite includes edge cases and comprehensive error scenario coverage to ensure the defensive error handling works as intended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (2)
router-tests/modules/stream_publish_test.go (1)
31-38: Fix typo in explanatory commentThe comment has a small typo: “garantueed” → “guaranteed”.
- // By default events are immutable in Cosmo Streams hooks, because it is not garantueed they aren't + // By default events are immutable in Cosmo Streams hooks, because it is not guaranteed they aren'trouter-tests/modules/stream_receive_test.go (1)
208-217: Fix typo in immutability commentSame typo as noted previously: “garantueed” → “guaranteed”.
- // By default events are immutable in Cosmo Streams hooks, because it is not garantueed they aren't + // By default events are immutable in Cosmo Streams hooks, because it is not guaranteed they aren't
🧹 Nitpick comments (2)
router-tests/modules/stream-receive/module.go (2)
14-18: Consider makingHookCallCountan embeddedatomic.Int32instead of a pointerUsing a
*atomic.Int32forces callers to manage allocation and nil checks, and it becomes easier to accidentally swap the pointer after the router starts, which can introduce data races. An embedded value keeps the counter always available and concurrency‑safe with a simpler call site.Example refactor:
type StreamReceiveModule struct { Logger *zap.Logger Callback func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) - HookCallCount *atomic.Int32 // Counter to track how many times the hook is called + HookCallCount atomic.Int32 // Counter to track how many times the hook is called }and in
OnReceiveEvents:- if m.HookCallCount != nil { - m.HookCallCount.Add(1) - } + m.HookCallCount.Add(1)Tests can still read the counter via
m.HookCallCount.Load(), or take its address if they need a pointer.
27-41: Optionally prefer the request‑scoped logger from the handler contextThe hook implementation looks good and aligns with the
StreamReceiveEventHandlercontract. For per‑subscription diagnostics, you might get richer logs (request ID, operation name, tenant, etc.) by logging viactx.Logger()when available, and only falling back to the module logger:-func (m *StreamReceiveModule) OnReceiveEvents(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) { - if m.Logger != nil { - m.Logger.Info("Stream Hook has been run") - } +func (m *StreamReceiveModule) OnReceiveEvents(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) { + if logger := ctx.Logger(); logger != nil { + logger.Info("Stream Hook has been run") + } else if m.Logger != nil { + m.Logger.Info("Stream Hook has been run") + }If you intentionally want only module‑level logging here, you can ignore this; the current behavior is still correct.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
router-tests/modules/start-subscription/module.go(1 hunks)router-tests/modules/start_subscription_test.go(1 hunks)router-tests/modules/stream-publish/module.go(1 hunks)router-tests/modules/stream-receive/module.go(1 hunks)router-tests/modules/stream_publish_test.go(1 hunks)router-tests/modules/stream_receive_test.go(1 hunks)router/core/router.go(3 hunks)router/core/supervisor_instance.go(2 hunks)router/pkg/config/config.go(1 hunks)router/pkg/config/config.schema.json(1 hunks)router/pkg/config/fixtures/full.yaml(1 hunks)router/pkg/config/testdata/config_defaults.json(1 hunks)router/pkg/config/testdata/config_full.json(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- router/core/supervisor_instance.go
- router/pkg/config/config.schema.json
- router-tests/modules/start_subscription_test.go
- router-tests/modules/start-subscription/module.go
- router/pkg/config/fixtures/full.yaml
- router/pkg/config/testdata/config_full.json
- router-tests/modules/stream-publish/module.go
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Applied to files:
router/pkg/config/testdata/config_defaults.jsonrouter-tests/modules/stream_publish_test.gorouter/core/router.gorouter-tests/modules/stream_receive_test.go
📚 Learning: 2025-09-17T20:55:39.456Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.
Applied to files:
router/pkg/config/testdata/config_defaults.jsonrouter/core/router.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
Applied to files:
router-tests/modules/stream-receive/module.go
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Applied to files:
router-tests/modules/stream_publish_test.gorouter/core/router.gorouter-tests/modules/stream_receive_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Applied to files:
router-tests/modules/stream_receive_test.go
🧬 Code graph analysis (4)
router-tests/modules/stream-receive/module.go (3)
router/core/subscriptions_modules.go (2)
StreamReceiveEventHandlerContext(269-292)StreamReceiveEventHandler(294-304)router/pkg/pubsub/datasource/provider.go (1)
StreamEvents(61-63)router/core/modules.go (3)
ModuleContext(160-164)Module(52-54)ModuleInfo(44-50)
router-tests/modules/stream_publish_test.go (9)
router-tests/testenv/testenv.go (5)
Run(105-122)ConfigWithEdfsKafkaJSONTemplate(85-85)Environment(1733-1769)ConfigWithEdfsNatsJSONTemplate(87-87)ConfigWithEdfsRedisJSONTemplate(89-89)router-tests/modules/stream-publish/module.go (1)
PublishModule(14-18)router/core/subscriptions_modules.go (1)
StreamPublishEventHandlerContext(306-327)router/pkg/pubsub/datasource/provider.go (5)
StreamEvents(61-63)MutableStreamEvent(94-98)PublishEventConfiguration(108-112)StreamEvent(86-91)NewStreamEvents(81-83)router/core/router.go (3)
Option(172-172)WithModulesConfig(1679-1683)WithCustomModules(1786-1790)router-tests/events/utils.go (3)
KafkaEnsureTopicExists(15-32)ReadKafkaMessages(55-71)ReadRedisMessages(96-121)router/pkg/pubsub/kafka/engine_datasource.go (2)
PublishEventConfiguration(188-192)MutableEvent(62-66)router/pkg/pubsub/redis/engine_datasource.go (2)
PublishEventConfiguration(141-145)MutableEvent(35-37)router/pkg/pubsub/nats/engine_datasource.go (1)
MutableEvent(42-45)
router/core/router.go (2)
router/core/subscriptions_modules.go (3)
SubscriptionOnStartHandler(202-206)StreamPublishEventHandler(329-336)StreamReceiveEventHandler(294-304)router/pkg/config/config.go (1)
StreamsHandlerConfiguration(648-650)
router-tests/modules/stream_receive_test.go (8)
router-tests/testenv/testenv.go (2)
Run(105-122)Environment(1733-1769)router-tests/modules/stream-receive/module.go (1)
StreamReceiveModule(14-18)router/pkg/config/config.go (4)
Config(1017-1092)Graph(24-29)StreamsHandlerConfiguration(648-650)OnReceiveEventsConfiguration(652-655)router/core/router.go (2)
Option(172-172)WithStreamsHandlerConfiguration(2143-2148)router-tests/events/utils.go (2)
KafkaEnsureTopicExists(15-32)ProduceKafkaMessage(34-53)router-tests/testenv/utils.go (1)
AwaitChannelWithT(10-19)router/core/subscriptions_modules.go (1)
StreamReceiveEventHandlerContext(269-292)router/pkg/pubsub/datasource/provider.go (4)
StreamEvents(61-63)StreamEvent(86-91)NewStreamEvents(81-83)MutableStreamEvent(94-98)
🔇 Additional comments (7)
router/pkg/config/testdata/config_defaults.json (1)
293-305: Defaults forevents.handlers.on_receive_eventslook consistentThe JSON defaults (
MaxConcurrentHandlers: 100,HandlerTimeout: 5000000000) match the Go defaults inOnReceiveEventsConfigurationandrouter.NewRouter, so the config surface is coherent for both YAML/env and JSON-based tests.router/core/router.go (3)
256-262: Subscription receive‑hook defaults align with config; semantics are “zero means default”The defaulting of
maxConcurrentHandlersto 100 andtimeoutto5*time.Secondwhen zero matchesOnReceiveEventsConfiguration’s envDefaults and avoids accidentally running with unbounded/zero values when options are omitted or constructed programmatically.
686-696: Module stream hooks are wired cleanly intosubscriptionHooksRegistering
SubscriptionOnStartHandler,StreamPublishEventHandler, andStreamReceiveEventHandlerhere mirrors the existing module wiring (middleware, pre/post origin, trace propagators) and cleanly centralizes all stream‑related hooks undersubscriptionHooks.
2143-2148:WithStreamsHandlerConfigurationcorrectly keeps timeout/concurrency construction‑time onlyThis option writes into
r.subscriptionHooks.onReceiveEventsbefore the updater is created, keeping timeout/semaphore configuration construction‑time only and leavingSetHooksto replace handlers only, as intended. Also consistent with the “zero means default” behavior enforced inNewRouter.router/pkg/config/config.go (1)
637-655: Streams handler config is well‑shaped and matches router expectations
EventsConfiguration.Handlers.OnReceiveEventswithMaxConcurrentHandlersandHandlerTimeout(envDefault 100 / 5s) lines up with the router’sWithStreamsHandlerConfigurationandsubscriptionHooksdefaults, giving a single, consistent configuration surface for receive‑hook concurrency/timeout.router-tests/modules/stream_receive_test.go (2)
842-967: Timeout test for out‑of‑order delivery is well‑designedThe combination of
hookDelay > hookTimeout, three produced events, collection of IDs, and explicit assertions about the last/first positions plus the timeout log message provides strong coverage of the timeout semantics and the documented “may arrive out of order” trade‑off.
663-688: Fix invalidforloop overtc.numSubscribersin concurrency test
for i := range tc.numSubscribersis not valid Go (you can’trangeover anint). This will prevent the tests from compiling. It should be a standard counted loop:- for i := range tc.numSubscribers { + for i := 0; i < tc.numSubscribers; i++ { clients[i] = graphql.NewSubscriptionClient(surl) clientRunChs[i] = make(chan error) subscriptionArgsChs[i] = make(chan kafkaSubscriptionArgs, 1) @@ - go func(i int) { - clientRunChs[i] <- clients[i].Run() - }(i) + go func(i int) { + clientRunChs[i] <- clients[i].Run() + }(i) }With this fix, the test will correctly spin up the expected number of clients and exercise the
MaxConcurrentHandlersbehavior.Also applies to: 789-808
⛔ Skipped due to learnings
Learnt from: SkArchon Repo: wundergraph/cosmo PR: 2216 File: router-tests/authentication_test.go:847-847 Timestamp: 2025-09-17T13:51:50.823Z Learning: Go 1.22 introduced the ability to range over integers, making `for range n` syntax valid for iterating n times in loops.
Removed a switch case, which is not reachable anymore. Also removed an early return in case a type assertion did not work. This way we ensure the response is actually sent either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
router-tests/modules/start_subscription_test.go (1)
154-154: Buffer all callback coordination channels to prevent deadlocks.Multiple test helper channels are created unbuffered, creating race conditions where subscription callbacks can block if they attempt to send before the test goroutine reaches the corresponding
AwaitChannelWithTreceive:
- Line 154:
subscriptionArgsChin "Test StartSubscription write event works"- Line 194:
callbackCalledin "Test StartSubscription closes client connection when hook returns an error"- Lines 341, 352:
subscriptionOneArgsChandsubscriptionTwoArgsChin "Test event emitted byStartSubscription sends event only to the client that triggered the hook"- Line 441:
subscriptionOneArgsChin "Test StartSubscription error is propagated to the client"- Line 593:
subscriptionOneArgsChin "Test StartSubscription hook is called for engine subscription and write event works"- Line 684:
subscriptionOneArgsChin "Test when StartSubscription hook returns an error, the OnOriginResponse hook is not called"The timing window between when
client.Run()activates subscriptions and when the test reachesAwaitChannelWithTis non-deterministic. If a callback fires during this gap and attempts to send to an unbuffered channel, it blocks the router goroutine indefinitely, causing test hangs.Note: Past review comments for lines 194 and 341 were marked "Fixed", but the code still shows unbuffered channels.
Apply buffering to all callback coordination channels:
// Test 2, line 154: -subscriptionArgsCh := make(chan kafkaSubscriptionArgs) +subscriptionArgsCh := make(chan kafkaSubscriptionArgs, 1) // Test 3, line 194: -callbackCalled := make(chan bool) +callbackCalled := make(chan bool, 1) // Test 4, lines 341 and 352: -subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs) +subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 1) ... -subscriptionTwoArgsCh := make(chan kafkaSubscriptionArgs) +subscriptionTwoArgsCh := make(chan kafkaSubscriptionArgs, 1) // Test 5, line 441: -subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs) +subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 1) // Test 7, line 593: -subscriptionOneArgsCh := make(chan subscriptionArgs) +subscriptionOneArgsCh := make(chan subscriptionArgs, 1) // Test 8, line 684: -subscriptionOneArgsCh := make(chan subscriptionArgs) +subscriptionOneArgsCh := make(chan subscriptionArgs, 1)Buffering makes the
require.Len(..., 0)cleanup assertions at lines 383 and 475 meaningful—they can now detect unexpected callback invocations.Also applies to: 194-194, 341-341, 352-352, 441-441, 593-593, 684-684
🧹 Nitpick comments (1)
router-tests/modules/start_subscription_test.go (1)
279-279: Fix typo in test name."byStartSubscription" should be "by StartSubscription" (add space before "StartSubscription").
-t.Run("Test event emitted byStartSubscription sends event only to the client that triggered the hook", func(t *testing.T) { +t.Run("Test event emitted by StartSubscription sends event only to the client that triggered the hook", func(t *testing.T) {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
router-tests/modules/start_subscription_test.go(1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Applied to files:
router-tests/modules/start_subscription_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
Applied to files:
router-tests/modules/start_subscription_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Applied to files:
router-tests/modules/start_subscription_test.go
📚 Learning: 2025-09-17T20:55:39.456Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.
Applied to files:
router-tests/modules/start_subscription_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Applied to files:
router-tests/modules/start_subscription_test.go
🧬 Code graph analysis (1)
router-tests/modules/start_subscription_test.go (6)
router-tests/testenv/testenv.go (4)
Run(105-122)ConfigWithEdfsKafkaJSONTemplate(85-85)LogObservationConfig(388-391)Environment(1733-1769)router-tests/modules/start-subscription/module.go (1)
StartSubscriptionModule(14-19)router/core/router.go (3)
Option(172-172)WithModulesConfig(1679-1683)WithCustomModules(1786-1790)router-tests/testenv/utils.go (1)
AwaitChannelWithT(10-19)router/core/subscriptions_modules.go (2)
SubscriptionOnStartHandlerContext(15-53)StreamHandlerError(448-451)router/pkg/pubsub/kafka/engine_datasource.go (2)
SubscriptionEventConfiguration(91-95)MutableEvent(62-66)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (5)
router-tests/modules/stream_publish_test.go (1)
91-93: Update comment to match actual assertion.The comment mentions verifying the hook via a log message, but the test actually checks
HookCallCount. Additionally, line 92 has a grammatical error: "as been" should be "has been".Apply this diff:
- // This test verifies that the publish hook is invoked when a mutation with a Kafka publish is executed. - // It confirms the hook as been called by checking a log message, which is written by the custom module - // used in these tests right before the actual hook is being called. + // This test verifies that the publish hook is invoked when a mutation with a Kafka publish is executed. + // It confirms the hook has been called by asserting HookCallCount on the custom module.router-tests/modules/stream_receive_test.go (4)
212-213: Fix typo in comment explaining immutabilityThe comment should say “guaranteed” instead of “garantueed” to keep the documentation clear:
- // By default events are immutable in Cosmo Streams hooks, because it is not garantueed they aren't + // By default events are immutable in Cosmo Streams hooks, because it is not guaranteed they aren't
381-390: Avoid sharing one subscription struct between two clients (data race)Both
clientandclient2subscribe using&subscriptionQuery, so two independent subscription goroutines can concurrently unmarshal into the same target struct, which is a data race (even if this test only inspects the raw JSON bytes).Give each client its own target value:
- var subscriptionQuery struct { + var subscriptionQuery struct { employeeUpdatedMyKafka struct { ID float64 `graphql:"id"` Details struct { Forename string `graphql:"forename"` Surname string `graphql:"surname"` } `graphql:"details"` } `graphql:"employeeUpdatedMyKafka(employeeID: 3)"` } + var subscriptionQueryAuth struct { + employeeUpdatedMyKafka struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"employeeUpdatedMyKafka(employeeID: 3)"` + } @@ - subscriptionOneID, err := client.Subscribe(&subscriptionQuery, nil, func(dataValue []byte, errValue error) error { + subscriptionOneID, err := client.Subscribe(&subscriptionQuery, nil, func(dataValue []byte, errValue error) error { @@ - subscriptionTwoID, err := client2.Subscribe(&subscriptionQuery, nil, func(dataValue []byte, errValue error) error { + subscriptionTwoID, err := client2.Subscribe(&subscriptionQueryAuth, nil, func(dataValue []byte, errValue error) error {Also applies to: 423-433
623-630: UnbufferedsubscriptionArgsChis never read and can block callbackIn the batch-error test,
subscriptionArgsChis unbuffered and written from the subscription callback but never read in the test, so any callback invocation would block and potentially hangclient.Run().Either remove the channel entirely and use a no-op callback, or make it buffered and non-blocking. For minimal change:
- subscriptionArgsCh := make(chan kafkaSubscriptionArgs) - subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { - subscriptionArgsCh <- kafkaSubscriptionArgs{ - dataValue: dataValue, - errValue: errValue, - } - return nil - }) + subscriptionArgsCh := make(chan kafkaSubscriptionArgs, 1) // best-effort capture if callback fires + subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error { + select { + case subscriptionArgsCh <- kafkaSubscriptionArgs{dataValue: dataValue, errValue: errValue}: + default: + // channel not observed in this test; drop to avoid blocking + } + return nil + })(Or, if you don’t care about the callback here, just replace it with a no-op that ignores
dataValue/errValueand drop the channel entirely.)
773-781: Fix invalidrangeon int and per-client subscription struct data raceIn the concurrency test:
for i := range tc.numSubscribersdoes not compile in Go (you can’t range over anint).- All clients subscribe with the same
subscriptionQueryinstance, so multiple goroutines can concurrently decode into the same struct, causing a data race.Define a per-client subscription struct slice and iterate over the
clientsslice:- var subscriptionQuery struct { - employeeUpdatedMyKafka struct { - ID float64 `graphql:"id"` - Details struct { - Forename string `graphql:"forename"` - Surname string `graphql:"surname"` - } `graphql:"details"` - } `graphql:"employeeUpdatedMyKafka(employeeID: 3)"` - } - - surl := xEnv.GraphQLWebSocketSubscriptionURL() - client := graphql.NewSubscriptionClient(surl) - - clients := make([]*graphql.SubscriptionClient, tc.numSubscribers) - clientRunChs := make([]chan error, tc.numSubscribers) - subscriptionArgsChs := make([]chan kafkaSubscriptionArgs, tc.numSubscribers) - - for i := range tc.numSubscribers { + type subscriptionQuery struct { + employeeUpdatedMyKafka struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"employeeUpdatedMyKafka(employeeID: 3)"` + } + + surl := xEnv.GraphQLWebSocketSubscriptionURL() + + clients := make([]*graphql.SubscriptionClient, tc.numSubscribers) + clientRunChs := make([]chan error, tc.numSubscribers) + subscriptionArgsChs := make([]chan kafkaSubscriptionArgs, tc.numSubscribers) + subscriptions := make([]subscriptionQuery, tc.numSubscribers) + + for i := range clients { clients[i] = graphql.NewSubscriptionClient(surl) clientRunChs[i] = make(chan error) subscriptionArgsChs[i] = make(chan kafkaSubscriptionArgs, 1) @@ - subscriptionID, err := clients[i].Subscribe(&subscriptionQuery, nil, func(dataValue []byte, errValue error) error { + subscriptionID, err := clients[i].Subscribe(&subscriptions[idx], nil, func(dataValue []byte, errValue error) error {This both fixes compilation and ensures each client has its own stable decode target.
Also applies to: 785-808
🧹 Nitpick comments (1)
router-tests/modules/stream_publish_test.go (1)
134-137: Improve comment clarity.The comment phrasing on lines 136-137 could be clearer. Consider rephrasing for better readability.
Apply this diff:
- // This test verifies that when the publish hook returns events and an error - // but the messages are still sent to Kafka. + // This test verifies that when the publish hook returns both events and an error, + // the messages are still sent to Kafka.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
router-tests/modules/stream_publish_test.go(1 hunks)router-tests/modules/stream_receive_test.go(1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
Applied to files:
router-tests/modules/stream_receive_test.gorouter-tests/modules/stream_publish_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
Applied to files:
router-tests/modules/stream_receive_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.
Applied to files:
router-tests/modules/stream_receive_test.gorouter-tests/modules/stream_publish_test.go
📚 Learning: 2025-11-16T11:52:34.064Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/pkg/pubsub/redis/engine_datasource.go:37-42
Timestamp: 2025-11-16T11:52:34.064Z
Learning: In router/pkg/pubsub/redis/engine_datasource.go (and similar files for kafka/nats), the MutableEvent.GetData() method intentionally returns the internal Data slice without cloning, while Event.GetData() clones the slice. This is by design: MutableEvent is designed to be mutable (so callers can modify the data), while Event is immutable (cloning prevents modification). This difference is part of the Cosmo Streams v1 hook interface design.
Applied to files:
router-tests/modules/stream_receive_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Applied to files:
router-tests/modules/stream_receive_test.gorouter-tests/modules/stream_publish_test.go
🧬 Code graph analysis (2)
router-tests/modules/stream_receive_test.go (10)
router-tests/testenv/testenv.go (3)
Run(105-122)LogObservationConfig(388-391)Environment(1733-1769)router-tests/modules/stream-receive/module.go (1)
StreamReceiveModule(14-18)router/pkg/config/config.go (4)
Config(1017-1092)Graph(24-29)StreamsHandlerConfiguration(648-650)OnReceiveEventsConfiguration(652-655)router/core/router.go (5)
Option(172-172)WithModulesConfig(1679-1683)WithCustomModules(1786-1790)WithAccessController(1839-1843)WithStreamsHandlerConfiguration(2143-2148)router-tests/events/utils.go (2)
KafkaEnsureTopicExists(15-32)ProduceKafkaMessage(34-53)router-tests/testenv/utils.go (1)
AwaitChannelWithT(10-19)router/core/subscriptions_modules.go (1)
StreamReceiveEventHandlerContext(269-292)router/pkg/pubsub/datasource/provider.go (4)
StreamEvents(61-63)StreamEvent(86-91)NewStreamEvents(81-83)MutableStreamEvent(94-98)router/pkg/authentication/jwks_token_decoder.go (3)
NewJwksTokenDecoder(78-241)JWKSConfig(44-56)TokenDecoder(21-23)router/pkg/authentication/http_header_authenticator.go (2)
HttpHeaderAuthenticatorOptions(81-89)NewHttpHeaderAuthenticator(93-113)
router-tests/modules/stream_publish_test.go (10)
router-tests/testenv/testenv.go (2)
Run(105-122)Environment(1733-1769)router-tests/modules/stream-publish/module.go (1)
PublishModule(14-18)router/core/subscriptions_modules.go (1)
StreamPublishEventHandlerContext(306-327)router/pkg/pubsub/datasource/provider.go (5)
StreamEvents(61-63)MutableStreamEvent(94-98)PublishEventConfiguration(108-112)StreamEvent(86-91)NewStreamEvents(81-83)router/core/router.go (3)
Option(172-172)WithModulesConfig(1679-1683)WithCustomModules(1786-1790)router-tests/events/utils.go (3)
KafkaEnsureTopicExists(15-32)ReadKafkaMessages(55-71)ReadRedisMessages(96-121)router-tests/testenv/utils.go (1)
AwaitChannelWithT(10-19)router/pkg/pubsub/kafka/engine_datasource.go (2)
PublishEventConfiguration(188-192)MutableEvent(62-66)router/pkg/pubsub/redis/engine_datasource.go (2)
PublishEventConfiguration(141-145)MutableEvent(35-37)router/pkg/pubsub/nats/engine_datasource.go (1)
MutableEvent(42-45)
🔇 Additional comments (6)
router-tests/modules/stream_publish_test.go (2)
282-285: LGTM: Proper channel handling with timeout.The use of
testenv.AwaitChannelWithTcorrectly addresses the race condition that would occur withrequire.Lenon a channel. This ensures the test waits for the message to arrive or times out gracefully.
289-360: LGTM: Comprehensive test of Kafka-specific mutations.This test effectively demonstrates the publish hook's ability to:
- Clone events to obtain mutable copies
- Access operation variables from the context
- Modify event data
- Add broker-specific features (Kafka headers)
The test logic is correct and provides good coverage of the Kafka publish hook functionality.
router-tests/modules/stream_receive_test.go (4)
37-115: Subscription happy-path wiring looks correctTest setup (Kafka topic provisioning, single WS client, subscription, event production, and shutdown) is coherent and exercises the receive hook path end-to-end; the
HookCallCountassertion gives a clear signal that the hook is actually invoked.
117-206: Event cloning & mutation pattern is correctThe hook’s use of
event.Clone()followed bySetDataand wrapping indatasource.NewStreamEventscleanly validates the intended “immutable by default, mutable via clone” contract, and the assertions confirm only the transformed payload reaches the client.
471-572: Header-based hook condition is well-structuredUsing
http.CanonicalHeaderKeyand readingctx.Request().Header[customHeader]to gate event mutation is a clean way to validate header propagation into the hook context; the test flow (single client with custom header, one Kafka event, JSON assertion, hook-count check) is tight and clear.
842-968: Out-of-order delivery & timeout behavior is well exercisedThe last test nicely validates the timeout behavior: delaying only the first hook call, collecting IDs via a buffered channel and
AwaitChannelWithT, asserting that all three IDs arrive plus that ID1arrives last, and checking for the specific timeout log message and hook call count. This gives good confidence in the trade-off semantics where timeouts can reorder events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
router/core/errors.go (1)
38-38: Consider renaming for consistency with Cosmo Streams v1.The constant uses
errorTypeEDFSHookErrornaming while this PR implements Cosmo Streams v1 and checks forStreamHandlerError. Since the PR objectives indicate this is rebranding EDFS to Cosmo Streams, considererrorTypeStreamHookErrorfor consistency.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
router-tests/modules/stream_receive_test.go(1 hunks)router/core/errors.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- router-tests/modules/stream_receive_test.go
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
🧬 Code graph analysis (1)
router/core/errors.go (1)
router/core/subscriptions_modules.go (1)
StreamHandlerError(448-451)
🔇 Additional comments (1)
router/core/errors.go (1)
81-84: No action required—StreamHandlerError correctly implements the error interface.Verification confirms that
StreamHandlerErrorhas the requiredError() stringmethod (router/core/subscriptions_modules.go:454-456), so theerrors.Ascheck will work as intended and properly classify hook errors.
This actually introduces a bug, where occasionally subscription clients will get disconnected after receiving messages when we deal with lots of connecting and disconnecting clients. It depends on two contexts being regularly checked inside the Kafka topic poller and if one of them cancels, the connections would get closed. These contexts are meant to handle the lifecycle of the Kafka adapter, not subscriptions. So we should not close the subscription updater when the poller is canceled. After this commit it behaves like the router behaves before Cosmo Streams. All of this can be reevaluated, maybe there is room for improvement. But for now its better not to change this bevaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
adr/cosmo-streams-v1.md (5)
23-26: Clarify language and naming terminology.Several lines have minor wording issues that could be tightened:
- Line 24: "prior to delivery" → suggest "before delivery" (more concise)
- Line 25: "going to be sent" → suggest "will be sent" (more direct)
- `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery. - `StreamPublishEventHandler`: Called each time a batch of events is going to be sent to the provider. + `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events are received from the provider, before delivery. + `StreamPublishEventHandler`: Called each time a batch of events will be sent to the provider.
219-219: Apply hyphenation to compound adjectives.Two instances where compound adjectives should be hyphenated for clarity:
- Line 219: "to/from Federation compatible" → "Federation-compatible"
- Line 225: "fully backwards compatible" → "fully backward-compatible"
- **Data mapping**: Transforming events data from the format that could be used by the external system to/from Federation compatible Router events + **Data mapping**: Transforming events data from the format that could be used by the external system to/from Federation-compatible Router events- The new hooks can be integrated in the router in a fully backwards compatible way. + The new hooks can be integrated in the router in a fully backward-compatible way.Also applies to: 225-225
235-235: Correct emphasis and heading formatting.Line 235 uses underscores (
__text__) for strong emphasis instead of asterisks (**text**), and treats strong text as a heading when it should be a proper Markdown heading (using##).- __All examples reflect the current implementation and match the actual API__ + ## All examples reflect the current implementation and match the actual API
289-292: Replace hard tabs with spaces in example code blocks.Additional hard tabs detected in the Go code examples (lines 289–292, 294, 428–431, 437–438, 445–446). Replace with consistent space indentation.
Also applies to: 294-294, 428-431, 437-438, 445-446
87-189: Replace hard tabs with spaces in interface definitions (lines 87–189).Hard tabs detected in all interface method definitions and comments. Replace with spaces for consistency and portability across different editors and tools.
router/pkg/pubsub/kafka/adapter.go (1)
126-180: Consider explicitly closing the per-subscription Kafka consumer clientIn
Subscribe, a newkgo.Clientis created per subscription and used only inside the poller goroutine. WhentopicPollerreturns (either on shutdown or error), the goroutine just exits; the consumer client is never explicitly closed here.To avoid potential lingering background goroutines or connections in the franz-go client, consider closing it when the poller ends:
go func() { - defer p.closeWg.Done() + defer p.closeWg.Done() + defer client.Close() err := p.topicPoller(ctx, client, updater, PollerOpts{providerId: conf.ProviderID()}) if err != nil { // ... } }()If the consumer lifecycle is already managed elsewhere, this can be skipped, but as written this function appears to be the only owner of
client.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
adr/cosmo-streams-v1.md(1 hunks)router/pkg/pubsub/kafka/adapter.go(5 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-11-16T11:52:34.064Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/pkg/pubsub/redis/engine_datasource.go:37-42
Timestamp: 2025-11-16T11:52:34.064Z
Learning: In router/pkg/pubsub/redis/engine_datasource.go (and similar files for kafka/nats), the MutableEvent.GetData() method intentionally returns the internal Data slice without cloning, while Event.GetData() clones the slice. This is by design: MutableEvent is designed to be mutable (so callers can modify the data), while Event is immutable (cloning prevents modification). This difference is part of the Cosmo Streams v1 hook interface design.
Applied to files:
router/pkg/pubsub/kafka/adapter.goadr/cosmo-streams-v1.md
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.
Applied to files:
router/pkg/pubsub/kafka/adapter.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
Applied to files:
adr/cosmo-streams-v1.md
🧬 Code graph analysis (1)
router/pkg/pubsub/kafka/adapter.go (5)
router/pkg/pubsub/datasource/provider.go (6)
Adapter(24-28)ProviderType(52-52)ProviderTypeKafka(56-56)StreamEvent(86-91)SubscriptionEventConfiguration(101-105)PublishEventConfiguration(108-112)router/pkg/pubsub/datasource/subscription_event_updater.go (1)
SubscriptionEventUpdater(21-26)router/pkg/metric/stream_metric_store.go (3)
StreamsEvent(25-31)ProviderType(16-16)ProviderTypeKafka(19-19)router/pkg/pubsub/kafka/engine_datasource.go (4)
Event(21-23)MutableEvent(62-66)SubscriptionEventConfiguration(91-95)PublishEventConfiguration(188-192)router/pkg/pubsub/datasource/error.go (2)
NewError(12-17)Error(3-6)
🪛 LanguageTool
adr/cosmo-streams-v1.md
[style] ~24-~24: ‘prior to’ might be wordy. Consider a shorter alternative.
Context: ...f events is received from the provider, prior to delivery. - StreamPublishEventHandler...
(EN_WORDINESS_PREMIUM_PRIOR_TO)
[style] ~25-~25: Use ‘will’ instead of ‘going to’ if the following action is certain.
Context: ...er`: Called each time a batch of events is going to be sent to the provider. ```go // STRU...
(GOING_TO_WILL)
[grammar] ~219-~219: Use a hyphen to join words.
Context: ...y the external system to/from Federation compatible Router events - **Event filte...
(QB_NEW_EN_HYPHEN)
[uncategorized] ~225-~225: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ... be integrated in the router in a fully backwards compatible way. When the new module system will b...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
🪛 markdownlint-cli2 (0.18.1)
adr/cosmo-streams-v1.md
87-87: Hard tabs
Column: 1
(MD010, no-hard-tabs)
88-88: Hard tabs
Column: 1
(MD010, no-hard-tabs)
89-89: Hard tabs
Column: 1
(MD010, no-hard-tabs)
90-90: Hard tabs
Column: 1
(MD010, no-hard-tabs)
91-91: Hard tabs
Column: 1
(MD010, no-hard-tabs)
92-92: Hard tabs
Column: 1
(MD010, no-hard-tabs)
93-93: Hard tabs
Column: 1
(MD010, no-hard-tabs)
94-94: Hard tabs
Column: 1
(MD010, no-hard-tabs)
95-95: Hard tabs
Column: 1
(MD010, no-hard-tabs)
96-96: Hard tabs
Column: 1
(MD010, no-hard-tabs)
97-97: Hard tabs
Column: 1
(MD010, no-hard-tabs)
98-98: Hard tabs
Column: 1
(MD010, no-hard-tabs)
99-99: Hard tabs
Column: 1
(MD010, no-hard-tabs)
100-100: Hard tabs
Column: 1
(MD010, no-hard-tabs)
101-101: Hard tabs
Column: 1
(MD010, no-hard-tabs)
102-102: Hard tabs
Column: 1
(MD010, no-hard-tabs)
103-103: Hard tabs
Column: 1
(MD010, no-hard-tabs)
104-104: Hard tabs
Column: 1
(MD010, no-hard-tabs)
105-105: Hard tabs
Column: 1
(MD010, no-hard-tabs)
106-106: Hard tabs
Column: 1
(MD010, no-hard-tabs)
107-107: Hard tabs
Column: 1
(MD010, no-hard-tabs)
108-108: Hard tabs
Column: 1
(MD010, no-hard-tabs)
109-109: Hard tabs
Column: 1
(MD010, no-hard-tabs)
110-110: Hard tabs
Column: 1
(MD010, no-hard-tabs)
111-111: Hard tabs
Column: 1
(MD010, no-hard-tabs)
112-112: Hard tabs
Column: 1
(MD010, no-hard-tabs)
113-113: Hard tabs
Column: 1
(MD010, no-hard-tabs)
114-114: Hard tabs
Column: 1
(MD010, no-hard-tabs)
115-115: Hard tabs
Column: 1
(MD010, no-hard-tabs)
116-116: Hard tabs
Column: 1
(MD010, no-hard-tabs)
117-117: Hard tabs
Column: 1
(MD010, no-hard-tabs)
118-118: Hard tabs
Column: 1
(MD010, no-hard-tabs)
119-119: Hard tabs
Column: 1
(MD010, no-hard-tabs)
120-120: Hard tabs
Column: 1
(MD010, no-hard-tabs)
121-121: Hard tabs
Column: 1
(MD010, no-hard-tabs)
122-122: Hard tabs
Column: 1
(MD010, no-hard-tabs)
123-123: Hard tabs
Column: 1
(MD010, no-hard-tabs)
133-133: Hard tabs
Column: 1
(MD010, no-hard-tabs)
134-134: Hard tabs
Column: 1
(MD010, no-hard-tabs)
135-135: Hard tabs
Column: 1
(MD010, no-hard-tabs)
136-136: Hard tabs
Column: 1
(MD010, no-hard-tabs)
137-137: Hard tabs
Column: 1
(MD010, no-hard-tabs)
138-138: Hard tabs
Column: 1
(MD010, no-hard-tabs)
139-139: Hard tabs
Column: 1
(MD010, no-hard-tabs)
140-140: Hard tabs
Column: 1
(MD010, no-hard-tabs)
141-141: Hard tabs
Column: 1
(MD010, no-hard-tabs)
142-142: Hard tabs
Column: 1
(MD010, no-hard-tabs)
143-143: Hard tabs
Column: 1
(MD010, no-hard-tabs)
144-144: Hard tabs
Column: 1
(MD010, no-hard-tabs)
145-145: Hard tabs
Column: 1
(MD010, no-hard-tabs)
146-146: Hard tabs
Column: 1
(MD010, no-hard-tabs)
147-147: Hard tabs
Column: 1
(MD010, no-hard-tabs)
148-148: Hard tabs
Column: 1
(MD010, no-hard-tabs)
149-149: Hard tabs
Column: 1
(MD010, no-hard-tabs)
150-150: Hard tabs
Column: 1
(MD010, no-hard-tabs)
151-151: Hard tabs
Column: 1
(MD010, no-hard-tabs)
152-152: Hard tabs
Column: 1
(MD010, no-hard-tabs)
153-153: Hard tabs
Column: 1
(MD010, no-hard-tabs)
154-154: Hard tabs
Column: 1
(MD010, no-hard-tabs)
170-170: Hard tabs
Column: 1
(MD010, no-hard-tabs)
171-171: Hard tabs
Column: 1
(MD010, no-hard-tabs)
172-172: Hard tabs
Column: 1
(MD010, no-hard-tabs)
173-173: Hard tabs
Column: 1
(MD010, no-hard-tabs)
174-174: Hard tabs
Column: 1
(MD010, no-hard-tabs)
175-175: Hard tabs
Column: 1
(MD010, no-hard-tabs)
176-176: Hard tabs
Column: 1
(MD010, no-hard-tabs)
177-177: Hard tabs
Column: 1
(MD010, no-hard-tabs)
178-178: Hard tabs
Column: 1
(MD010, no-hard-tabs)
179-179: Hard tabs
Column: 1
(MD010, no-hard-tabs)
180-180: Hard tabs
Column: 1
(MD010, no-hard-tabs)
181-181: Hard tabs
Column: 1
(MD010, no-hard-tabs)
182-182: Hard tabs
Column: 1
(MD010, no-hard-tabs)
183-183: Hard tabs
Column: 1
(MD010, no-hard-tabs)
184-184: Hard tabs
Column: 1
(MD010, no-hard-tabs)
185-185: Hard tabs
Column: 1
(MD010, no-hard-tabs)
186-186: Hard tabs
Column: 1
(MD010, no-hard-tabs)
187-187: Hard tabs
Column: 1
(MD010, no-hard-tabs)
188-188: Hard tabs
Column: 1
(MD010, no-hard-tabs)
189-189: Hard tabs
Column: 1
(MD010, no-hard-tabs)
235-235: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
235-235: Strong style
Expected: asterisk; Actual: underscore
(MD050, strong-style)
235-235: Strong style
Expected: asterisk; Actual: underscore
(MD050, strong-style)
289-289: Hard tabs
Column: 1
(MD010, no-hard-tabs)
290-290: Hard tabs
Column: 1
(MD010, no-hard-tabs)
291-291: Hard tabs
Column: 1
(MD010, no-hard-tabs)
292-292: Hard tabs
Column: 1
(MD010, no-hard-tabs)
294-294: Hard tabs
Column: 1
(MD010, no-hard-tabs)
428-428: Hard tabs
Column: 1
(MD010, no-hard-tabs)
429-429: Hard tabs
Column: 1
(MD010, no-hard-tabs)
430-430: Hard tabs
Column: 1
(MD010, no-hard-tabs)
431-431: Hard tabs
Column: 1
(MD010, no-hard-tabs)
437-437: Hard tabs
Column: 1
(MD010, no-hard-tabs)
437-437: Hard tabs
Column: 6
(MD010, no-hard-tabs)
438-438: Hard tabs
Column: 1
(MD010, no-hard-tabs)
445-445: Hard tabs
Column: 1
(MD010, no-hard-tabs)
445-445: Hard tabs
Column: 6
(MD010, no-hard-tabs)
446-446: Hard tabs
Column: 1
(MD010, no-hard-tabs)
🔇 Additional comments (3)
adr/cosmo-streams-v1.md (1)
15-232: ADR structure and content are sound.The document clearly articulates the design rationale, interface contracts, and immutable-vs-mutable event trade-offs. The
EmitLocalEventterminology reflects the prior rename fromWriteEvent, and examples demonstrate realistic module implementation patterns (authorization, filtering, data transformation). No substantive documentation gaps identified.router/pkg/pubsub/kafka/adapter.go (2)
23-24: Adapter compile-time assertion is correctThe
var _ datasource.Adapter = (*ProviderAdapter)(nil)check is a good safeguard to keepProviderAdapteraligned with the publicdatasource.Adaptersurface as it evolves.
51-118: topicPoller event construction and metrics wiring look consistentThe changes to
topicPollerto:
- use
datasource.SubscriptionEventUpdater,- extract Kafka headers into a
map[string][]byte,- wrap records as
*Event{evt: &MutableEvent{...}}, and- emit
Consumemetrics withproviderIdfromPollerOptsare consistent with the Streams v1
StreamEvent/MutableStreamEventmodel and preserve key, value, and headers as expected. No issues from a correctness or API-usage perspective.Based on learnings
|
Waiting for Engine PR to get merged and new engine version to be released. Then I pull latest changes from main, update router go.mod, then its ready for merge. |
Summary by CodeRabbit
New Features
Configuration
Bug Fixes
Tests
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.
Checklist
Context
This pull request implements Cosmo Streams v1. It rebrands and extends EDFS by the ability to add hooks in form of new custom modules:
SubscriptionOnStart: A handler running before a subscription is establishedOnReceiveEvents: A handler running whenever events are received from a message provider, before being passed to clientsOnPublishEvents: A handler running before events are getting sent to message providersThese custom modules extend EDFS´s capabilities by allowing custom authorization logic, event filtering, etc.
The preview docs explains this in more detail. It's a good place to see what the hooks are capable of and how to use them.
https://wundergraphinc-topics-streams-v1.mintlify.app/overview
Related pull requests
There are two other pull requests, which contribute to the initial release of Cosmo Streams.
Cosmo Docs: wundergraph/cosmo-docs#187
Engine: wundergraph/graphql-go-tools#1309
Important
This pull request must not be merged before the engine pull request has been merged. The router´s go.mod in this pull request currently points to a version of the engine from the Cosmo Streams branch and once we have an official version of the engine with these changes included, this pull requests gets updated to use that.