Skip to content

Conversation

@dkorittki
Copy link
Contributor

@dkorittki dkorittki commented Oct 10, 2025

Summary by CodeRabbit

  • New Features

    • Provider-agnostic streaming hooks for subscription start, inbound receive, and outbound publish; multi-event publish and mutable/immutable event support across NATS/Kafka/Redis.
  • Configuration

    • New handlers settings to control receive concurrency and handler timeouts; configurable via schema and defaults.
  • Bug Fixes

    • Improved GraphQL error mapping and HTTP status handling for stream hook errors.
  • Tests

    • Extensive unit and integration tests validating hook behaviors and cross-provider flows.
  • Documentation

    • ADR/RFC and examples detailing Cosmo Streams v1 and hook usage.

✏️ Tip: You can customize this high-level summary in your review settings.

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.
  • Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • I have read the Contributors Guide.

Context

This pull request implements Cosmo Streams v1. It rebrands and extends EDFS by the ability to add hooks in form of new custom modules:

  • SubscriptionOnStart: A handler running before a subscription is established
  • OnReceiveEvents: A handler running whenever events are received from a message provider, before being passed to clients
  • OnPublishEvents: A handler running before events are getting sent to message providers

These custom modules extend EDFS´s capabilities by allowing custom authorization logic, event filtering, etc.

The preview docs explains this in more detail. It's a good place to see what the hooks are capable of and how to use them.
https://wundergraphinc-topics-streams-v1.mintlify.app/overview

Related pull requests

There are two other pull requests, which contribute to the initial release of Cosmo Streams.

Cosmo Docs: wundergraph/cosmo-docs#187
Engine: wundergraph/graphql-go-tools#1309

Important

This pull request must not be merged before the engine pull request has been merged. The router´s go.mod in this pull request currently points to a version of the engine from the Cosmo Streams branch and once we have an official version of the engine with these changes included, this pull requests gets updated to use that.

alepane21 and others added 2 commits October 10, 2025 12:13
Co-authored-by: Ludwig Bedacht <[email protected]>
Co-authored-by: StarpTech <[email protected]>
Co-authored-by: Dominik Korittki <[email protected]>
Co-authored-by: Ludwig Bedacht <[email protected]>
Co-authored-by: StarpTech <[email protected]>
Co-authored-by: Dominik Korittki <[email protected]>
@coderabbitai
Copy link

coderabbitai bot commented Oct 10, 2025

Walkthrough

Adds Cosmo Streams v1: new provider-agnostic stream/event types, hook interfaces and contexts (SubscriptionOnStart, OnReceiveEvents, OnPublishEvents), threads hooks through router core and pubsub layers, refactors provider/adapter/engine datasource APIs to use the new abstractions, and adds tests, mocks and configuration for concurrency/timeouts.

Changes

Cohort / File(s) Summary
Docs
adr/cosmo-streams-v1.md, rfc/cosmo-streams-v1.md
Adds ADR/RFC describing Streams v1: ProviderType, StreamEvent/MutableStreamEvent/StreamEvents, hook interfaces/contexts, design notes and examples.
Core wiring & GraphQL
router/core/*.go, router/core/subscriptions_modules.go, router/core/router_config.go, router/core/executor.go, router/core/factoryresolver.go, router/core/plan_generator.go, router/core/graph_server.go, router/core/graphql_handler.go, router/core/errors.go, router/core/supervisor_instance.go
Introduces subscriptionHooks and hook contexts/handlers, wires hooks through loader/plan/executor/graph server, adds StreamHandlerError handling and WithStreamsHandlerConfiguration option.
Pub/Sub datasource core
router/pkg/pubsub/datasource/* (e.g., datasource.go, hooks.go, pubsubprovider.go, subscription_datasource.go, subscription_event_updater.go, planner.go, factory.go, mocks*.go)
Adds Hooks types and container, ProviderType enum, StreamEvent/MutableStreamEvent/StreamEvents types, SetHooks/Publish/Subscribe API changes, PubSub provider hook application with panic recovery, generic PubSub subscription datasource, SubscriptionEventUpdater with concurrency/timeouts/panic recovery, planner/factory signature updates and mock adaptations.
Provider adapters & engine datasources
router/pkg/pubsub/{kafka,nats,redis}/* (adapters, engine_datasource*, provider_builder*, engine_datasource_factory*, tests, mocks)
Refactors adapters/engine datasources to use datasource.Adapter/datasource.SubscriptionEventUpdater, introduces Event/MutableEvent types, multi-event Publish signatures, new subscription/publish config shapes (provider, rootFieldName), UniqueRequestID wiring, and updates tests/mocks.
PubSub integration & build
router/pkg/pubsub/pubsub.go, router/pkg/pubsub/pubsub_test.go
BuildProvidersAndDataSources now accepts logger and hooks, builds provider map, calls SetHooks on providers, and passes providers/hooks into planner config and factory calls.
Router config & schema
router/pkg/config/*, router/pkg/config/config.schema.json, router/pkg/config/fixtures/full.yaml, router/pkg/config/testdata/*
Adds StreamsHandlerConfiguration and OnReceiveEvents configuration (max_concurrent_handlers, handler_timeout), wires into EventsConfiguration, updates schema/defaults/fixtures.
Tests & test utilities
router-tests/events/*, router-tests/modules/*, router-tests/modules/*_test.go, router-tests/events/utils.go, router-tests/go.mod
Moves many tests to external test packages, exposes/refactors test helpers, adds Kafka/Redis/NATS utilities (KafkaEnsureTopicExists, ProduceKafkaMessage with timeout, ReadKafkaMessages, ReadRedisMessages), and adds extensive integration/unit tests for hook behavior and modules.
Demo changes
demo/pkg/subgraphs/.../schema.resolvers.go
Updates demo resolvers to publish via new PublishEventConfiguration and pass []datasource.StreamEvent (MutableEvent) slices.
Mocks & mock config
router/.mockery.yml, router/pkg/pubsub/*/mocks*.go
Updates mock generation config, renames/rewires many mocks to use datasource types, adds SubscriptionUpdater mocks and updated Expecter/Call signatures.
Misc / go.mod updates
router/go.mod, router-tests/go.mod
Module version pins/formatting adjustments.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Areas to focus review on:

  • router/pkg/pubsub/datasource/subscription_event_updater.go — concurrency semaphore, per-subscription goroutines, deadline/timeout handling, panic recovery and Close semantics.
  • router/core/subscriptions_modules.go — hook context adapters, NewEvent/EmitLocalEvent semantics, cloning (immutable vs mutable) and data ownership.
  • router/pkg/pubsub/datasource/pubsubprovider.go — applyPublishEventHooks chaining, panic recovery, nil-event filtering and event-builder usage.
  • Provider adapters and engine_datasource code under router/pkg/pubsub/{kafka,nats,redis} — runtime type assertions, multi-event Publish loops, header/key handling, UniqueRequestID computation, JSON template marshaling.
  • Mocks and tests — ensure mock signatures align with new interfaces and updated test utilities/timeouts in router-tests/events/utils.go.

Possibly related PRs

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'feat(router): implement Cosmo Streams v1' clearly and concisely summarizes the main change: implementing a new version (v1) of Cosmo Streams in the router.
✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@dkorittki dkorittki changed the title Topic/streams v1 Cosmo Streams v1 Oct 10, 2025
Merge conflict was on go.mod files in router and router-tests,
because a new upstream engine version was used.
I go-getted the latest engine version of the topic branch,
which includes the latest engine changes + cosmo streams.
@github-actions
Copy link

github-actions bot commented Oct 17, 2025

❌ Internal Query Planner CI checks failed

The Internal Query Planner CI checks failed in the celestial repository, and this is going to stop the merge of this PR.
If you are part of the WunderGraph organization, you can see the PR with more details.

@dkorittki dkorittki changed the title Cosmo Streams v1 feat(router): implement Cosmo Streams v1 Nov 14, 2025
@dkorittki dkorittki requested a review from jensneuse November 14, 2025 11:58
@dkorittki dkorittki marked this pull request as ready for review November 14, 2025 11:59
@dkorittki dkorittki requested a review from JivusAyrus November 14, 2025 11:59
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
router-tests/modules/start_subscription_test.go (2)

173-255: Avoid blocking in StartSubscription callback via unbuffered callbackCalled channel

In the “close to true” test, the callback does:

callbackCalled := make(chan bool)

Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
    callbackCalled <- true
    return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK)
}

and the test only reads from callbackCalled after WaitForSubscriptionCount(1, ...).

If the router’s SubscriptionOnStart pipeline runs callbacks on the request’s critical path before incrementing the subscription count, this pattern could deadlock: the callback blocks on the send, the subscription count never reaches 1, and WaitForSubscriptionCount times out.

Even if today’s implementation uses a non‑blocking or separate goroutine for the callback, this test couples itself tightly to that detail.

To make the test robust against pipeline changes, consider one of:

- callbackCalled := make(chan bool)
+ callbackCalled := make(chan bool, 1)

or reordering to wait on the callback first (wrapped in AwaitChannelWithT), then asserting subscription counts.


257-357: Channel length assertion on unbuffered subscriptionOneArgsCh is effectively a no‑op

subscriptionOneArgsCh is created unbuffered:

subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs)

and later you do:

t.Cleanup(func() {
    require.Len(t, subscriptionOneArgsCh, 0)
})

For an unbuffered channel, len(subscriptionOneArgsCh) will always be 0 unless there is a buffered element (which never happens), so this assertion doesn’t actually prove that the callback was never invoked—the real guard is that any send would block and the test would hang.

If you want to assert that the “should not be called” callback never ran in a more explicit way, you could either:

  • Make the channel buffered and check it after the client is closed, e.g.:
- subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs)
+ subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 1)
...
t.Cleanup(func() {
    require.Len(t, subscriptionOneArgsCh, 0)
})
  • Or replace the cleanup with a simple boolean flag set in the callback and assert it remains false.

Otherwise, consider dropping the require.Len cleanup to avoid a misleading assertion.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 538c0ae and a0ed7e5.

📒 Files selected for processing (6)
  • router-tests/modules/start-subscription/module.go (1 hunks)
  • router-tests/modules/start_subscription_test.go (1 hunks)
  • router-tests/modules/stream-publish/module.go (1 hunks)
  • router-tests/modules/stream-receive/module.go (1 hunks)
  • router/pkg/pubsub/datasource/mocks.go (15 hunks)
  • router/pkg/pubsub/pubsub_test.go (8 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • router-tests/modules/stream-publish/module.go
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router-tests/modules/start_subscription_test.go
  • router/pkg/pubsub/pubsub_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router-tests/modules/start_subscription_test.go
  • router/pkg/pubsub/pubsub_test.go
  • router/pkg/pubsub/datasource/mocks.go
📚 Learning: 2025-08-20T22:13:25.222Z
Learnt from: StarpTech
Repo: wundergraph/cosmo PR: 2157
File: router-tests/go.mod:16-16
Timestamp: 2025-08-20T22:13:25.222Z
Learning: github.com/mark3labs/mcp-go v0.38.0 has regressions and should not be used in the wundergraph/cosmo project. v0.36.0 is the stable version that should be used across router-tests and other modules.

Applied to files:

  • router/pkg/pubsub/pubsub_test.go
🧬 Code graph analysis (5)
router-tests/modules/start_subscription_test.go (9)
router-tests/testenv/testenv.go (2)
  • Run (105-122)
  • Environment (1733-1769)
router-tests/modules/start-subscription/module.go (1)
  • StartSubscriptionModule (13-17)
router/core/router.go (3)
  • Option (172-172)
  • WithModulesConfig (1679-1683)
  • WithCustomModules (1786-1790)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
router/core/subscriptions_modules.go (1)
  • SubscriptionOnStartHandlerContext (15-31)
router/pkg/pubsub/kafka/engine_datasource.go (2)
  • SubscriptionEventConfiguration (91-95)
  • MutableEvent (62-66)
router/pkg/pubsub/datasource/provider.go (1)
  • SubscriptionEventConfiguration (101-105)
router/core/http_graphql_error.go (1)
  • NewHttpGraphqlError (22-28)
router/core/context.go (1)
  • RequestContext (61-142)
router-tests/modules/start-subscription/module.go (3)
router/core/subscriptions_modules.go (2)
  • SubscriptionOnStartHandlerContext (15-31)
  • SubscriptionOnStartHandler (180-184)
router/core/context.go (1)
  • RequestContext (61-142)
router/core/modules.go (4)
  • ModuleContext (160-164)
  • Module (52-54)
  • ModuleInfo (44-50)
  • EnginePostOriginHandler (132-136)
router/pkg/pubsub/pubsub_test.go (2)
router/pkg/pubsub/datasource/hooks.go (5)
  • Hooks (17-21)
  • OnReceiveEventsHooks (34-38)
  • OnReceiveEventsFn (14-14)
  • OnPublishEventsHooks (29-31)
  • OnPublishEventsFn (12-12)
router/pkg/metric/noop_stream_metrics.go (1)
  • NewNoopStreamMetricStore (15-15)
router/pkg/pubsub/datasource/mocks.go (4)
router/pkg/pubsub/kafka/engine_datasource.go (3)
  • SubscriptionDataSource (209-211)
  • PublishEventConfiguration (188-192)
  • SubscriptionEventConfiguration (91-95)
router/pkg/pubsub/redis/engine_datasource.go (3)
  • SubscriptionDataSource (163-165)
  • PublishEventConfiguration (141-145)
  • SubscriptionEventConfiguration (64-68)
router/pkg/pubsub/datasource/datasource.go (2)
  • SubscriptionDataSource (8-13)
  • EngineDataSourceFactory (23-39)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (21-26)
router-tests/modules/stream-receive/module.go (3)
router/core/subscriptions_modules.go (2)
  • StreamReceiveEventHandlerContext (247-263)
  • StreamReceiveEventHandler (265-275)
router/pkg/pubsub/datasource/provider.go (1)
  • StreamEvents (61-63)
router/core/modules.go (3)
  • ModuleContext (160-164)
  • Module (52-54)
  • ModuleInfo (44-50)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: build-router
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: image_scan
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: build_test
  • GitHub Check: integration_test (./events)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
🔇 Additional comments (13)
router-tests/modules/stream-receive/module.go (5)

10-15: LGTM!

The module structure is well-defined with appropriate public fields for test flexibility. The Callback field signature correctly matches the OnReceiveEvents method signature, enabling test injection.


17-22: LGTM!

The Provision method follows the standard module initialization pattern and correctly captures the logger for later use.


24-34: LGTM!

The OnReceiveEvents implementation correctly:

  • Uses defensive nil checks for optional fields
  • Delegates to the callback when provided
  • Returns events unchanged when no callback is set
  • Properly propagates errors from the callback

The defensive nil check for Logger is appropriate for a test module where instances might be created without proper initialization.


36-46: LGTM!

The Module method correctly returns module metadata with appropriate priority for a test module. The constructor pattern is standard and allows tests to configure the module after instantiation.


48-51: LGTM!

The interface guard is a best practice that ensures compile-time verification of the core.StreamReceiveEventHandler interface implementation.

router-tests/modules/start_subscription_test.go (4)

21-171: Solid coverage of basic hook invocation and Kafka write path

The first two subtests exercise both the “no-op” StartSubscription hook (log-only) and the Kafka write path end‑to‑end, including subscription lifecycle and log observation. The client.Run goroutine is correctly bounded via client.Close() and AwaitChannelWithT, which should prevent leaks and flakiness.


360-446: Good verification of HttpGraphqlError propagation to GraphQL client

The “error is propagated to the client” test does a nice job of asserting that a NewHttpGraphqlError from the StartSubscription hook surfaces as graphql.Errors with the expected statusCode and code extensions. This gives strong coverage of the new error path and its client‑visible shape.


448-584: Engine subscription tests correctly cover StartSubscription for in‑router streams

The engine subscription tests validate that the StartSubscription hook:

  • Runs for engine‑backed subscriptions (without Kafka config), and
  • Can inject an initial event via ctx.NewEvent/ctx.WriteEvent that precedes engine‑produced events.

The two AwaitChannelWithT calls for {"countEmp": 1000} and then {"countEmp": 0} give clear ordering guarantees, and the client lifecycle is properly closed via client.Close() and the clientRunCh channel.


586-661: OnOriginResponse behavior now matches test name and expectation

This test explicitly asserts that when the StartSubscription hook returns an error, CallbackOnOriginResponse is not invoked by checking require.Empty(t, originResponseCalled) on a buffered channel. That aligns with the test name and resolves the previous mismatch between name and behavior.

router-tests/modules/start-subscription/module.go (1)

1-62: StartSubscriptionModule implementation is minimal, safe, and correctly wired

  • Provision now only assigns ctx.Logger and SubscriptionOnStart guards with if m.Logger != nil, resolving the nil‑logger panic risk from earlier review.
  • SubscriptionOnStart and OnOriginResponse cleanly delegate to optional callbacks without side effects when unset.
  • Module() uses the shared myModuleID constant, sets a reasonable priority, and exposes a New factory, and the interface guards ensure compile‑time compatibility with SubscriptionOnStartHandler and EnginePostOriginHandler.

This module is well‑structured for both tests and real integrations.

router/pkg/pubsub/pubsub_test.go (2)

71-71: Past review comment addressed.

The BuildProvider mock expectation now correctly matches the two-argument signature (datasource config and metric store).


305-305: LGTM: Hooks parameter consistently added across all provider integration tests.

The BuildProvidersAndDataSources signature has been correctly updated for NATS, Kafka, and Redis integration tests with the hooks parameter.

Also applies to: 358-358, 411-411

router/pkg/pubsub/datasource/mocks.go (1)

201-253: LGTM: ResolveDataSourceSubscription return type correctly updated.

The generated mock now returns SubscriptionDataSource instead of resolve.SubscriptionDataSource, matching the updated interface definition.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
router-tests/modules/stream-publish/module.go (1)

10-10: Consider renaming for consistency with sibling module.

The module ID "publishModule" could follow the same naming pattern as the sibling StreamReceiveModule which uses "streamReceiveModule". Consider "streamPublishModule" for consistency.

-const myModuleID = "publishModule"
+const myModuleID = "streamPublishModule"
router-tests/modules/stream-receive/module.go (1)

26-26: Consider a more specific log message for test clarity.

The message "Stream Hook has been run" is less specific than the sibling PublishModule's "Publish Hook has been run". For easier test debugging, consider "Receive Hook has been run".

-		m.Logger.Info("Stream Hook has been run")
+		m.Logger.Info("Receive Hook has been run")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a0ed7e5 and ef85d0a.

📒 Files selected for processing (3)
  • router-tests/modules/stream-publish/module.go (1 hunks)
  • router-tests/modules/stream-receive/module.go (1 hunks)
  • router/pkg/pubsub/pubsub_test.go (8 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router/pkg/pubsub/pubsub_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router/pkg/pubsub/pubsub_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router/pkg/pubsub/pubsub_test.go
📚 Learning: 2025-08-20T22:13:25.222Z
Learnt from: StarpTech
Repo: wundergraph/cosmo PR: 2157
File: router-tests/go.mod:16-16
Timestamp: 2025-08-20T22:13:25.222Z
Learning: github.com/mark3labs/mcp-go v0.38.0 has regressions and should not be used in the wundergraph/cosmo project. v0.36.0 is the stable version that should be used across router-tests and other modules.

Applied to files:

  • router/pkg/pubsub/pubsub_test.go
🧬 Code graph analysis (3)
router/pkg/pubsub/pubsub_test.go (2)
router/pkg/pubsub/datasource/hooks.go (5)
  • Hooks (17-21)
  • OnReceiveEventsHooks (34-38)
  • OnReceiveEventsFn (14-14)
  • OnPublishEventsHooks (29-31)
  • OnPublishEventsFn (12-12)
router/pkg/metric/noop_stream_metrics.go (1)
  • NewNoopStreamMetricStore (15-15)
router-tests/modules/stream-receive/module.go (4)
router/pkg/mcpserver/util.go (1)
  • Logger (6-9)
router/core/subscriptions_modules.go (2)
  • StreamReceiveEventHandlerContext (247-263)
  • StreamReceiveEventHandler (265-275)
router/pkg/pubsub/datasource/provider.go (1)
  • StreamEvents (61-63)
router/core/modules.go (3)
  • ModuleContext (160-164)
  • Module (52-54)
  • ModuleInfo (44-50)
router-tests/modules/stream-publish/module.go (3)
router/core/subscriptions_modules.go (2)
  • StreamPublishEventHandlerContext (277-290)
  • StreamPublishEventHandler (292-299)
router/pkg/pubsub/datasource/provider.go (1)
  • StreamEvents (61-63)
router/core/modules.go (3)
  • ModuleContext (160-164)
  • Module (52-54)
  • ModuleInfo (44-50)
🔇 Additional comments (3)
router-tests/modules/stream-publish/module.go (1)

1-51: LGTM! Past issues resolved, module implementation is solid.

The nil logger guard and overall structure look good. The module correctly implements the StreamPublishEventHandler interface with appropriate safety checks.

router-tests/modules/stream-receive/module.go (1)

1-51: LGTM! Package naming corrected, implementation is sound.

The module correctly implements the StreamReceiveEventHandler interface with proper safety checks. Past package name issue has been resolved.

router/pkg/pubsub/pubsub_test.go (1)

6-6: LGTM! Tests correctly updated for hooks integration.

The test changes properly wire the new datasource.Hooks parameter through all build() and BuildProvidersAndDataSources() calls. The SetHooks mock expectations correctly omit .Return() since SetHooks is a void method. Passing empty datasource.Hooks{} in all tests is appropriate for unit tests that focus on the build plumbing rather than hook behavior.

Also applies to: 65-68, 75-75, 131-131, 186-186, 244-247, 254-254, 305-305, 358-358, 411-411


}, "unable to close client before timeout")

requestLog := xEnv.Observer().FilterMessage("SubscriptionOnStart Hook has been run")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using a mock implementation that increases a counter and assert on the number of hook calls instead of filtering on a message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, using a counter on the custom module now, which increases whenever the hook is called.
Commit: chore: use counter to count hook calls

}
ctx.WriteEvent((&kafka.MutableEvent{
Key: []byte("1"),
Data: []byte(`{"id": 1, "__typename": "Employee"}`),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we access the id from the ctx here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you specify what id you mean? The one from Data? Unfortunately no, thats not possible at the moment

"startSubscriptionModule": start_subscription.StartSubscriptionModule{
Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
callbackCalled <- true
return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention here to gracefully stop a subscription or is this just returning a generic error? If yes, could we find a more idiomatic way to close a subscription?

Copy link
Contributor Author

@dkorittki dkorittki Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention is to give hook developers the option to abort the subscription during initialization phase. So suppose return from the hook like this

return core.NewHttpGraphqlError("Forbidden", http.StatusText(http.StatusForbidden), http.StatusForbidden)

On the connection, this is what you get

 > {"type":"connection_init","payload":{}}
 < {"type":"connection_ack"}
 > {"id":"1","type":"subscribe","payload":{"query":"subscription { employeeUpdatedMyKafka(employeeID:1) { id details { forename surname } } }"}}
 < {"id":"1","type":"error","payload":[{"message":"Forbidden","extensions":{"statusCode":403,"code":"Forbidden"}}]}
Disconnected (code: 1000, reason: "Normal closure")

So first an error message is returned after a subscription start (not at connection_init), than the websockt connection is closed normally by the router. Do you have any specific ideas in mind on how to close the connection instead?

Copy link
Member

@jensneuse jensneuse Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we return a special error?

return core.CloseSubscription{
    Reason: "my custom message"
}

You can then do errors.As/errors.Is to react.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion with Jens, I now have added a dedicated type for this: core.StreamHandlerError.
Commit: chore: reintroduce StreamHandlerError

SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration
// WriteEvent writes an event to the stream of the current subscription
// It returns true if the event was written to the stream, false if the event was dropped
WriteEvent(event datasource.StreamEvent) bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have much stronger naming differentiation between WriteEvent and NewEvent.
My understanding from reading the tests is that WriteEvent goes through the event broker (e.g. Kafka) so it's fan out to all consumers vs. NewEvent is just local to the Subscription but I'm not entirely sure.
I've searched through the ADR and it's not clear what the differentiation is.
Can you clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see the confusion.

WriteEvent creates an event inside the router. For the router it looks like an event coming straight from a broker. It resolves the fields asked for by the operation and returns the results to the client. The method expects an event object, which you can create with NewEvent.

I will update the godoc and ADR to make this more clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, WriteEvent is not really writing an event to the broker, which I thought it would, but instead it's just emitting a new event, but not to other Routers through the broker, only to the current Router? Can we make it clear that this event is only for the current subscription? or is it for all Router local subscriptions? and could it also be delivered back to the broker to fan it out? Maybe we want to offer the user options. In any way, we want to be very explicit in regards to naming such that the side effects are very clear. "WriteEvent" is very generic and can mean a lot of things.

Copy link
Contributor Author

@dkorittki dkorittki Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, WriteEvent is not really writing an event to the broker, which I thought it would, but instead it's just emitting a new event, but not to other Routers through the broker, only to the current Router?

Yes, it's only emitting an event inside the router. No broker will ever receive that message.

Can we make it clear that this event is only for the current subscription? or is it for all Router local subscriptions?

The event is local to the subscription the hook is being called for. Other subscriptions will never see this event. I will address this in godoc and Cosmo docs.

and could it also be delivered back to the broker to fan it out? Maybe we want to offer the user options.

It's a good idea but at the moment we don't support this. Right now the main goal of the WriteEvent method was to provide hook developers a way to push initial data to new subscription clients, so they can build up state without waiting for an actual event to arrive from the broker.

But I do see how it can be valuable to push events to brokers when a new client connects. Not sure how complicated it is to add but I believe it's not something we can easily add to this hook. Probably involves some more work. Will look into it to see if we could easily add this.

Overall I agree the method sounds too vague and doesn't really tell what it's actually doing. Gonna rename the method and clarify in godoc and Cosmo Streams what's actually happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the method to EmitLocalEvent with a more detailed godoc.
Commit: chore: rename WriteEvent to EmitLocalEvent

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should update the ADR, right?

The core.NewHttpGraphqlError() method returns an error, which does not fit the
OnPublishEvent hook very well. Instead we return a generic error, which results in
the same thing. The error handling does not behave differently because of it.
If this hook returns an error it is not wanted that the error is logged.
Other Cosmo Streams hooks don't do it. The reason is we want the hook developer
to decide what log level to use for errors, or if he doesn't want to log an error at all.
This error log handling is already in place for the other two hooks but somehow
was forgotten on this one.
This makes the OnPublishEvent hook be more consistent in nil event filtering with the OnReceiveEvent hook.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
router/pkg/pubsub/datasource/pubsubprovider.go (1)

21-53: Consider adding observability for hook failures and nil event cleanup.

The panic recovery and hook processing would benefit from additional context:

  1. Hook identification in panic recovery: When a hook panics, the stack trace doesn't identify which specific hook failed, making debugging harder in production when multiple hooks are registered.

  2. Silent nil event removal: Lines 48-50 silently filter nil events. If a hook incorrectly returns nil events, this will mask the bug without any diagnostic signal.

  3. Hook error logging: When a hook returns an error (line 43), the loop breaks but the error isn't logged here, losing context about which hook in the chain failed.

Consider these improvements:

 func (p *PubSubProvider) applyPublishEventHooks(ctx context.Context, cfg PublishEventConfiguration, events []StreamEvent) (currentEvents []StreamEvent, err error) {
 	defer func() {
 		if r := recover(); r != nil {
 			p.Logger.
 				WithOptions(zap.AddStacktrace(zapcore.ErrorLevel)).
-				Error("[Recovery from handler panic]", zap.Any("error", r))
+				Error("[Recovery from publish hook panic]", 
+					zap.Any("error", r),
+					zap.String("provider_id", cfg.ProviderID()),
+					zap.String("root_field", cfg.RootFieldName()))
 
 			switch v := r.(type) {
 			case error:
 				err = v
 			default:
 				err = fmt.Errorf("%v", r)
 			}
 		}
 	}()
 
 	currentEvents = events
 
-	for _, hook := range p.hooks.OnPublishEvents.Handlers {
+	for i, hook := range p.hooks.OnPublishEvents.Handlers {
 		currentEvents, err = hook(ctx, cfg, currentEvents, p.eventBuilder)
 		if err != nil {
+			p.Logger.Error("publish hook returned error",
+				zap.Int("hook_index", i),
+				zap.Error(err),
+				zap.String("provider_id", cfg.ProviderID()))
 			break
 		}
 	}
 
+	nilCount := 0
 	currentEvents = slices.DeleteFunc(currentEvents, func(event StreamEvent) bool {
-		return event == nil
+		isNil := event == nil
+		if isNil {
+			nilCount++
+		}
+		return isNil
 	})
+	if nilCount > 0 {
+		p.Logger.Warn("removed nil events after hook processing",
+			zap.Int("nil_count", nilCount),
+			zap.String("provider_id", cfg.ProviderID()))
+	}
 
 	return currentEvents, err
 }
router-tests/modules/stream_receive_test.go (1)

695-727: Consider adding a small sleep in the busy loop.

The loop at lines 695-727 doesn't yield, which could waste CPU cycles. While this is acceptable in tests and the loop should exit quickly, consider adding a small time.Sleep(time.Millisecond) if CI performance becomes a concern.

 								for {
 									current := currentHandlers.Load()
 									max := maxCurrentHandlers.Load()
 
 									if current > max {
 										maxCurrentHandlers.CompareAndSwap(max, current)
 									}
 
 									if current >= int32(tc.maxConcurrent) {
 										// wait to see if the subscription-updater spawns too many concurrent hooks,
 										// i.e. exceeding the number of configured max concurrent hooks.
 										deadline := time.Now().Add(300 * time.Millisecond)
 										for time.Now().Before(deadline) {
 											if currentHandlers.Load() > int32(tc.maxConcurrent) {
 												break
 											}
+											time.Sleep(time.Millisecond)
 										}
 										break
 									}
 
 									// Let hooks continue if we never reach a updater batch size = tc.maxConcurrent
 									// because there are not enough remaining clients to be updated.
 									// i.e. it could be the last round of updates:
 									// 100 clients, now in comes a new event from broker, max concurrent hooks = 30.
 									// First round: 30 hooks run, 70 remaining.
 									// Second round: 30 hooks run, 40 remaining.
 									// Third round: 30 hooks run, 10 remaining.
 									// Fourth round: 10 hooks run, then we end up here because remainingSubs < tc.maxConcurrent.
 									remainingSubs := tc.numSubscribers - int(finishedHandlers.Load())
 									if remainingSubs < tc.maxConcurrent {
 										break
 									}
+									time.Sleep(time.Millisecond)
 								}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ef85d0a and f07f077.

📒 Files selected for processing (4)
  • router-tests/modules/start_subscription_test.go (1 hunks)
  • router-tests/modules/stream_publish_test.go (1 hunks)
  • router-tests/modules/stream_receive_test.go (1 hunks)
  • router/pkg/pubsub/datasource/pubsubprovider.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • router-tests/modules/stream_publish_test.go
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router-tests/modules/stream_receive_test.go
  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router-tests/modules/stream_receive_test.go
  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router-tests/modules/stream_receive_test.go
  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router-tests/modules/stream_receive_test.go
  • router-tests/modules/start_subscription_test.go
  • router/pkg/pubsub/datasource/pubsubprovider.go
🧬 Code graph analysis (3)
router-tests/modules/stream_receive_test.go (11)
router-tests/testenv/testenv.go (2)
  • Run (105-122)
  • Environment (1733-1769)
router/pkg/config/config.go (4)
  • Config (1017-1092)
  • Graph (24-29)
  • SubscriptionHooksConfiguration (648-650)
  • OnReceiveEventsConfiguration (652-655)
router-tests/modules/stream-receive/module.go (1)
  • StreamReceiveModule (12-15)
router/core/router.go (5)
  • Option (172-172)
  • WithModulesConfig (1679-1683)
  • WithCustomModules (1786-1790)
  • WithAccessController (1839-1843)
  • WithSubscriptionHooks (2143-2148)
router-tests/events/utils.go (2)
  • KafkaEnsureTopicExists (15-32)
  • ProduceKafkaMessage (34-53)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
router/core/subscriptions_modules.go (1)
  • StreamReceiveEventHandlerContext (247-263)
router/pkg/pubsub/datasource/provider.go (4)
  • StreamEvents (61-63)
  • StreamEvent (86-91)
  • NewStreamEvents (81-83)
  • MutableStreamEvent (94-98)
router/pkg/authentication/jwks_token_decoder.go (3)
  • NewJwksTokenDecoder (78-241)
  • JWKSConfig (44-56)
  • TokenDecoder (21-23)
router/pkg/authentication/http_header_authenticator.go (2)
  • HttpHeaderAuthenticatorOptions (81-89)
  • NewHttpHeaderAuthenticator (93-113)
router/core/access_controller.go (2)
  • NewAccessController (37-44)
  • AccessControllerOptions (20-25)
router-tests/modules/start_subscription_test.go (7)
router-tests/testenv/testenv.go (3)
  • Run (105-122)
  • LogObservationConfig (388-391)
  • Environment (1733-1769)
router-tests/modules/start-subscription/module.go (1)
  • StartSubscriptionModule (13-17)
router/core/router.go (3)
  • Option (172-172)
  • WithModulesConfig (1679-1683)
  • WithCustomModules (1786-1790)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
router/core/subscriptions_modules.go (1)
  • SubscriptionOnStartHandlerContext (15-31)
router/pkg/pubsub/kafka/engine_datasource.go (2)
  • SubscriptionEventConfiguration (91-95)
  • MutableEvent (62-66)
router/core/http_graphql_error.go (1)
  • NewHttpGraphqlError (22-28)
router/pkg/pubsub/datasource/pubsubprovider.go (4)
router/pkg/pubsub/datasource/provider.go (4)
  • Adapter (24-28)
  • PublishEventConfiguration (108-112)
  • StreamEvent (86-91)
  • SubscriptionEventConfiguration (101-105)
router/pkg/pubsub/datasource/hooks.go (1)
  • Hooks (17-21)
router/pkg/pubsub/datasource/subscription_datasource.go (1)
  • EventBuilderFn (16-16)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (21-26)
🔇 Additional comments (13)
router/pkg/pubsub/datasource/pubsubprovider.go (4)

81-94: Verify error handling semantics when hooks fail.

The current implementation publishes events even when applyPublishEventHooks returns an error:

processedEvents, hooksErr := p.applyPublishEventHooks(ctx, cfg, events)

errPublish := p.Adapter.Publish(ctx, cfg, processedEvents)
if errPublish != nil {
    return errPublish
}

return hooksErr

This means:

  • If hooks fail but publish succeeds, the hook error is returned to the caller after events have already been published
  • If hooks panic or return an error, publishing still proceeds with whatever events the hooks produced before failing

Is this the intended behavior? Typically, middleware hooks either:

  1. Gate the operation: Hook failure prevents publishing entirely
  2. Observe only: Hook failure is logged but doesn't affect the return value

The current hybrid approach (publish then return hook error) could lead to confusion about whether the publish succeeded.

Consider clarifying the intended semantics:

Option 1: Hooks gate publishing

processedEvents, hooksErr := p.applyPublishEventHooks(ctx, cfg, events)
if hooksErr != nil {
    return hooksErr  // Don't publish if hooks fail
}
return p.Adapter.Publish(ctx, cfg, processedEvents)

Option 2: Hooks are observability-only

processedEvents, hooksErr := p.applyPublishEventHooks(ctx, cfg, events)
if hooksErr != nil {
    p.Logger.Error("publish hooks failed but continuing", zap.Error(hooksErr))
}
return p.Adapter.Publish(ctx, cfg, processedEvents)  // Only return publish errors

77-79: LGTM!

Clean delegation to the adapter. Hook processing for subscriptions is correctly handled at the updater layer rather than in this provider wrapper.


96-98: LGTM!

Simple hook configuration method that correctly follows the design pattern where hooks can be set independently of other configuration.

Based on learnings


100-112: LGTM - Previous nil logger concern addressed.

The constructor correctly normalizes nil logger to zap.NewNop(), which prevents panics in applyPublishEventHooks and other logging paths. All fields including the new eventBuilder are properly initialized.

router-tests/modules/stream_receive_test.go (8)

1-36: LGTM!

Package structure and imports are well-organized for comprehensive integration testing of the StreamReceiveModule with Kafka.


37-112: LGTM!

Test properly verifies hook invocation with appropriate cleanup and timeout handling.


114-201: LGTM!

Test correctly demonstrates the clone-then-mutate pattern for event transformation and verifies the results.


298-460: LGTM!

Comprehensive test of authentication-based event transformation with proper setup, assertions, and cleanup for multiple clients.


462-562: LGTM!

Test correctly verifies custom header access in hooks with proper use of canonical header keys.


564-640: LGTM!

Test properly verifies that hook errors trigger graceful subscription closure and cleanup.


736-825: LGTM!

Comprehensive concurrency test with proper setup, multiple clients, and verification that concurrent handler limits are respected.


827-951: LGTM!

Test correctly verifies that hooks exceeding the timeout don't block subsequent events, resulting in out-of-order delivery. The logic and assertions properly demonstrate the timeout mechanism.

router-tests/modules/start_subscription_test.go (1)

21-612: Solid end-to-end coverage of SubscriptionOnStart and engine flows

The suite thoroughly exercises: basic hook invocation, Kafka/stream WriteEvent behavior, per-subscription targeting, error propagation (including HTTP GraphQL error metadata), engine-based subscriptions, and interaction with OnOriginResponse, all via testenv.Run and the websocket client. This gives strong regression coverage for the new Cosmo Streams v1 start-subscription hook wiring.

Comment on lines 188 to 253
callbackCalled := make(chan bool)

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"startSubscriptionModule": start_subscription.StartSubscriptionModule{
Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
callbackCalled <- true
return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK)
},
},
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&start_subscription.StartSubscriptionModule{}),
},
LogObservation: testenv.LogObservationConfig{
Enabled: true,
LogLevel: zapcore.InfoLevel,
},
}, func(t *testing.T, xEnv *testenv.Environment) {
var subscriptionOne struct {
employeeUpdatedMyKafka struct {
ID float64 `graphql:"id"`
Details struct {
Forename string `graphql:"forename"`
Surname string `graphql:"surname"`
} `graphql:"details"`
} `graphql:"employeeUpdatedMyKafka(employeeID: $employeeID)"`
}

surl := xEnv.GraphQLWebSocketSubscriptionURL()
client := graphql.NewSubscriptionClient(surl)

vars := map[string]interface{}{
"employeeID": 3,
}
type kafkaSubscriptionArgs struct {
dataValue []byte
errValue error
}
subscriptionArgsCh := make(chan kafkaSubscriptionArgs, 1)
subscriptionOneID, err := client.Subscribe(&subscriptionOne, vars, func(dataValue []byte, errValue error) error {
subscriptionArgsCh <- kafkaSubscriptionArgs{
dataValue: dataValue,
errValue: errValue,
}
return nil
})
require.NoError(t, err)
require.NotEmpty(t, subscriptionOneID)

clientRunCh := make(chan error)
go func() {
clientRunCh <- client.Run()
}()

xEnv.WaitForSubscriptionCount(1, time.Second*10)
<-callbackCalled
xEnv.WaitForSubscriptionCount(0, time.Second*10)

Copy link

@coderabbitai coderabbitai bot Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Unbuffered callbackCalled can deadlock the router goroutine running the hook

Here the hook does:

callbackCalled := make(chan bool)

Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
    callbackCalled <- true
    return core.NewHttpGraphqlError(...)
}
...
xEnv.WaitForSubscriptionCount(1, time.Second*10)
<-callbackCalled

If the router invokes Callback before the test goroutine reaches <-callbackCalled, the send on the unbuffered channel can block the router goroutine. Depending on the internal ordering, this can prevent the subscription from ever fully registering, so WaitForSubscriptionCount(1, ...) may never succeed, leading to a hang.

Use a buffered channel (and ideally a timeout-based wait) so the hook is never blocked by the test’s receive. For example:

-       callbackCalled := make(chan bool)
+       callbackCalled := make(chan bool, 1)
...
-                   Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
-                       callbackCalled <- true
-                       return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK)
-                   },
+                   Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
+                       callbackCalled <- true
+                       return core.NewHttpGraphqlError("subscription closed", http.StatusText(http.StatusOK), http.StatusOK)
+                   },
...
-           xEnv.WaitForSubscriptionCount(1, time.Second*10)
-           <-callbackCalled
+           xEnv.WaitForSubscriptionCount(1, time.Second*10)
+           testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, func(t *testing.T, called bool) {
+               require.True(t, called)
+           }, "StartSubscription callback was not invoked")
            xEnv.WaitForSubscriptionCount(0, time.Second*10)

This keeps the assertion while removing the risk that the hook blocks the router.


I'll verify the review comment by examining the actual code and checking for the suggested helper function.


---

🏁 Script executed:

```shell
#!/bin/bash
# Locate and read the test file to verify the code snippet
fd -t f "start_subscription_test.go" --path "*router-tests*" --exec cat {}

Length of output: 233


🏁 Script executed:

#!/bin/bash
# Search for the AwaitChannelWithT helper function
rg -n "AwaitChannelWithT" --type go

Length of output: 18185


🏁 Script executed:

#!/bin/bash
# Search for callback-related patterns in subscription code
rg -n "OnStartHandlerContext\|SubscriptionOnStart" --type go -C2

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check for similar channel patterns in other tests
rg -n "callbackCalled|make\(chan.*bool\)" --type go router-tests/ -A2 -B2

Length of output: 1695


🏁 Script executed:

#!/bin/bash
# Look for testenv helper functions to understand available utilities
rg -n "func.*Await\|func.*Wait" --type go testenv/ -A1

Length of output: 48



---

🏁 Script executed:

```shell
#!/bin/bash
# Get the full AwaitChannelWithT implementation
rg -n "func AwaitChannelWithT" --type go -A10

Length of output: 654


🏁 Script executed:

#!/bin/bash
# Get exact context around the problematic code in start_subscription_test.go lines 188-253
rg -n "callbackCalled := make" --type go -B5 -A65

Length of output: 6006


🏁 Script executed:

#!/bin/bash
# Look for subscription lifecycle/callback execution in core module
fd -type f "*.go" -path "*/core/*" --exec grep -l "SubscriptionOnStartHandlerContext\|Callback" {} \;

Length of output: 170


🏁 Script executed:

#!/bin/bash
# Check testenv helpers location
find . -type f -name "utils.go" -exec grep -l "AwaitChannelWithT" {} \;

Length of output: 62


🏁 Script executed:

#!/bin/bash
# Get core interface definition for the callback
rg -n "SubscriptionOnStartHandlerContext" --type go -B2 -A2

Length of output: 6262


Use a buffered channel and timeout-protected receive for callbackCalled

The unbuffered channel at line 188 can deadlock the router goroutine if the callback executes before the test reaches the blocking receive at line 251. Since the callback is invoked by the router during subscription setup, and the test flow involves asynchronous goroutines, the timing is non-deterministic.

Change line 188 to buffer the channel: callbackCalled := make(chan bool, 1)

Replace the blocking receive at line 251 with the timeout-protected helper already used throughout the codebase:

testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, func(t *testing.T, called bool) {
    require.True(t, called)
}, "StartSubscription callback was not invoked")

This prevents the router goroutine from blocking and aligns with the pattern used in 60+ other tests in the repository.

🤖 Prompt for AI Agents
In router-tests/modules/start_subscription_test.go around lines 188 to 253, the
test creates an unbuffered channel callbackCalled which can deadlock if the
router invokes the callback before the test's blocking receive; change the
channel creation to a buffered channel (capacity 1) and replace the direct
blocking receive with the repo's timeout-protected helper
testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, ...) to assert the
callback was invoked, using the provided failure message "StartSubscription
callback was not invoked".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the current state of the callbackCalled channel and its usage
rg -n "callbackCalled" --type go router-tests/modules/start_subscription_test.go -B2 -A5

Length of output: 882


@dkorittki, thank you for addressing part of the issue! I can see you've correctly replaced the blocking receive with testenv.AwaitChannelWithT at line 260, which is great. However, the channel at line 194 is still unbuffered:

callbackCalled := make(chan bool)  // Still unbuffered

This needs to be:

callbackCalled := make(chan bool, 1)  // Buffered with capacity 1

Without the buffer, the router goroutine can still deadlock when it executes the callback at line 199 (callbackCalled <- true) if the test hasn't yet reached the AwaitChannelWithT call. The buffer ensures the send never blocks, regardless of timing.

Could you please add the buffer capacity to complete the fix?


🧠 Learnings used
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an unbuffered channel is done on purpose. It ensures the hook, who writes to the channel is blocked until the test itself reads from the channel (line 260). We need this blocking so we can measure in line 259 the client connected and in line 263 that the client got disconnected due to the error being returned by the hook.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find where SubscriptionOnStart hook is invoked to understand the timing
rg -n "SubscriptionOnStart|OnSubscriptionStart" --type go -B3 -A5 router/

Length of output: 35521


🏁 Script executed:

#!/bin/bash
# Look for subscription registration and count increment logic
rg -n "WaitForSubscriptionCount|SubscriptionCount" --type go -B2 -A2 router-tests/testenv/

Length of output: 295


🏁 Script executed:

#!/bin/bash
# Check the subscription lifecycle in the core to understand when hooks are called
ast-grep --pattern 'func $_($$) {
  $$$
  SubscriptionOnStart($$$)
  $$$
}'

Length of output: 0


@dkorittki, thank you for the clarification! I understand your reasoning now—you're using the blocking behavior to ensure the hook has executed before verifying the disconnection.

Your approach assumes a specific timing: that the subscription count increments to 1 before the SubscriptionOnStart hook completes. If this assumption holds, your test sequence works correctly:

  1. Subscription registers → count becomes 1
  2. Hook blocks on callbackCalled <- true
  3. Line 259: WaitForSubscriptionCount(1) succeeds
  4. Line 260: AwaitChannelWithT reads from channel → hook unblocks and returns error
  5. Line 263: WaitForSubscriptionCount(0) verifies closure

Since you've already added AwaitChannelWithT at line 260, the test has timeout protection—if the timing assumption is violated, the test fails rather than hanging indefinitely, which is acceptable.

However, this timing dependency is implicit and could be fragile if the subscription lifecycle implementation changes. For robustness, I'd suggest one of two approaches:

  1. Keep unbuffered + add a comment documenting the timing assumption:

    // Unbuffered channel ensures hook blocks until we verify subscription is registered
    // This synchronization depends on subscription count incrementing before SubscriptionOnStart completes
    callbackCalled := make(chan bool)
  2. Use buffered channel for explicit ordering without timing dependency:

    callbackCalled := make(chan bool, 1)  // Hook never blocks
    xEnv.WaitForSubscriptionCount(1, time.Second*10)  // Verify subscription registered
    testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, ...)  // Verify hook was called
    xEnv.WaitForSubscriptionCount(0, time.Second*10)  // Verify closure

Both are valid—option 1 preserves your synchronization intent, while option 2 is more defensive against implementation changes. Your call based on whether you want to strictly enforce the ordering or just verify it happened.


🧠 Learnings used
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.

core.NewHttpGraphqlError() is not really the best way to describe
an error in hooks processing. We have to set http specific codes
and status texts, which does not really fit well with subscriptions.
I created a new error for this called StreamHookError, which lets you
pass a message. It can be returned from a hook:

  return core.StreamHandlerError{Message: "my hook error"}

and it gets sent to the subscription client

  {"id":"1","type":"error","payload":[{"message":"my hook error"}]}

Afterwards we close the connection. This behaviour remains unchanged.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
router-tests/modules/start_subscription_test.go (2)

188-251: Unbuffered callbackCalled channel still creates deadlock risk

The unbuffered channel at line 188 can deadlock if the hook callback executes before the test's blocking receive at line 251. The hook runs during subscription setup (before or during WaitForSubscriptionCount), and the timing makes it likely the send at line 195 occurs before line 251, blocking the router goroutine indefinitely.

Buffer the channel and use timeout-protected receive:

-callbackCalled := make(chan bool)
+callbackCalled := make(chan bool, 1)

At line 251, replace the blocking receive:

 xEnv.WaitForSubscriptionCount(1, time.Second*10)
-<-callbackCalled
+testenv.AwaitChannelWithT(t, 10*time.Second, callbackCalled, func(t *testing.T, called bool) {
+    require.True(t, called)
+}, "StartSubscription callback was not invoked")
 xEnv.WaitForSubscriptionCount(0, time.Second*10)

328-371: Unbuffered subscriptionOneArgsCh creates deadlock risk and ineffective assertion

Line 328 creates an unbuffered channel but the test never receives from it (only subscriptionTwoArgsCh is consumed at line 357). If the subscription callback at lines 330-334 executes unexpectedly, the send will block forever. The cleanup assertion at line 371 using require.Len(..., 0) is meaningless because len() on an unbuffered channel always returns 0—it cannot detect whether the callback was invoked.

Buffer the channel so unexpected sends are detectable:

-subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs)
+subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 1)

The cleanup assertion at line 371 becomes meaningful with the buffer:

 t.Cleanup(func() {
-    require.Len(t, subscriptionOneArgsCh, 0)
+    require.Len(t, subscriptionOneArgsCh, 0, "unexpected event delivered to non-target subscription")
 })
🧹 Nitpick comments (5)
adr/cosmo-streams-v1.md (4)

24-25: Fix wordiness and grammar in hook descriptions.

Line 24 uses "prior to" which is unnecessarily wordy; line 25 uses "going to" when "will" is more appropriate for certain actions.

Apply this diff:

-    StreamReceiveEventHandler: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery.
-    StreamPublishEventHandler: Called each time a batch of events is going to be sent to the provider.
+    StreamReceiveEventHandler: Triggered for each client/subscription when a batch of events is received from the provider, before delivery.
+    StreamPublishEventHandler: Called each time a batch of events will be sent to the provider.

179-179: Fix compound adjective hyphenation and text inconsistency.

Two instances of missing hyphens in compound adjectives:

  • Line 179: "Federation compatible Router events" should be "Federation-compatible Router events"
  • Line 185: "backwards compatible way" should be "backward-compatible way" (also corrects spelling; "backward" is standard for "backward-compatible")

Apply this diff:

-    Data mapping: Transforming events data from the format that could be used by the external system to/from Federation compatible Router events
+    Data mapping: Transforming events data from the format that could be used by the external system to/from Federation-compatible Router events
     - **Event filtering**: Filtering events using custom logic
     - **Event creation**: Creating new events from scratch using `ctx.NewEvent(data)` method available in all handler contexts
     
     ## Backwards Compatibility
     
-    The new hooks can be integrated in the router in a fully backwards compatible way.
+    The new hooks can be integrated in the router in a fully backward-compatible way.

Also applies to: 185-185


195-195: Fix Markdown heading and emphasis style.

Line 195 uses emphasis (__All examples...__) instead of proper Markdown heading syntax. Additionally, the strong style uses underscores instead of asterisks, which is inconsistent with the rest of the document.

Apply this diff:

-__All examples reflect the current implementation and match the actual API__
+## All examples reflect the current implementation and match the actual API

249-252: Replace hard tabs with spaces in code examples.

The code examples contain hard tabs (indentation at lines 249, 250, 251, 252, 254, 388, 389, 390, 391, 397, 398, 405, 406) that should be replaced with spaces for consistency with the rest of the Markdown document and to ensure proper rendering across different editors and platforms.

Ensure all indentation in the code blocks uses spaces (typically 4 or 8 per indent level) instead of tab characters. This is flagged by markdownlint rule MD010 (no-hard-tabs).

Also applies to: 254-254, 388-391, 397-398, 405-406

router-tests/modules/start_subscription_test.go (1)

426-461: Unbuffered channel with ineffective cleanup assertion

The unbuffered channel at line 426 makes the cleanup assertion at line 461 meaningless. After receiving the expected error message at lines 446-450, require.Len(t, subscriptionOneArgsCh, 0) always passes because len() on an unbuffered channel always returns 0. The assertion cannot detect if additional unexpected messages were sent.

Buffer the channel to make the cleanup assertion meaningful:

-subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs)
+subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 2)

The cleanup check at line 461 will now properly detect unexpected additional messages.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f07f077 and 65d5d55.

📒 Files selected for processing (5)
  • adr/cosmo-streams-v1.md (1 hunks)
  • router-tests/modules/start_subscription_test.go (1 hunks)
  • router/core/errors.go (3 hunks)
  • router/core/graphql_handler.go (2 hunks)
  • router/core/subscriptions_modules.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • router/core/errors.go
  • router/core/subscriptions_modules.go
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router-tests/modules/start_subscription_test.go
  • adr/cosmo-streams-v1.md
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-11-16T11:52:34.064Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/pkg/pubsub/redis/engine_datasource.go:37-42
Timestamp: 2025-11-16T11:52:34.064Z
Learning: In router/pkg/pubsub/redis/engine_datasource.go (and similar files for kafka/nats), the MutableEvent.GetData() method intentionally returns the internal Data slice without cloning, while Event.GetData() clones the slice. This is by design: MutableEvent is designed to be mutable (so callers can modify the data), while Event is immutable (cloning prevents modification). This difference is part of the Cosmo Streams v1 hook interface design.

Applied to files:

  • adr/cosmo-streams-v1.md
🧬 Code graph analysis (2)
router-tests/modules/start_subscription_test.go (7)
router-tests/testenv/testenv.go (3)
  • Run (105-122)
  • LogObservationConfig (388-391)
  • Environment (1733-1769)
router-tests/modules/start-subscription/module.go (1)
  • StartSubscriptionModule (13-17)
router/core/router.go (3)
  • Option (172-172)
  • WithModulesConfig (1679-1683)
  • WithCustomModules (1786-1790)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
router/core/subscriptions_modules.go (2)
  • SubscriptionOnStartHandlerContext (15-53)
  • StreamHandlerError (448-451)
router/pkg/pubsub/kafka/engine_datasource.go (2)
  • SubscriptionEventConfiguration (91-95)
  • MutableEvent (62-66)
router/pkg/pubsub/datasource/provider.go (1)
  • SubscriptionEventConfiguration (101-105)
router/core/graphql_handler.go (2)
router/core/subscriptions_modules.go (1)
  • StreamHandlerError (448-451)
router/core/errors.go (1)
  • Extensions (49-55)
🪛 LanguageTool
adr/cosmo-streams-v1.md

[style] ~24-~24: ‘prior to’ might be wordy. Consider a shorter alternative.
Context: ...f events is received from the provider, prior to delivery. - StreamPublishEventHandler...

(EN_WORDINESS_PREMIUM_PRIOR_TO)


[style] ~25-~25: Use ‘will’ instead of ‘going to’ if the following action is certain.
Context: ...er`: Called each time a batch of events is going to be sent to the provider. ```go // STRU...

(GOING_TO_WILL)


[grammar] ~179-~179: Use a hyphen to join words.
Context: ...y the external system to/from Federation compatible Router events - **Event filte...

(QB_NEW_EN_HYPHEN)


[uncategorized] ~185-~185: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ... be integrated in the router in a fully backwards compatible way. When the new module system will b...

(EN_COMPOUND_ADJECTIVE_INTERNAL)

🪛 markdownlint-cli2 (0.18.1)
adr/cosmo-streams-v1.md

195-195: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


195-195: Strong style
Expected: asterisk; Actual: underscore

(MD050, strong-style)


195-195: Strong style
Expected: asterisk; Actual: underscore

(MD050, strong-style)


249-249: Hard tabs
Column: 1

(MD010, no-hard-tabs)


250-250: Hard tabs
Column: 1

(MD010, no-hard-tabs)


251-251: Hard tabs
Column: 1

(MD010, no-hard-tabs)


252-252: Hard tabs
Column: 1

(MD010, no-hard-tabs)


254-254: Hard tabs
Column: 1

(MD010, no-hard-tabs)


388-388: Hard tabs
Column: 1

(MD010, no-hard-tabs)


389-389: Hard tabs
Column: 1

(MD010, no-hard-tabs)


390-390: Hard tabs
Column: 1

(MD010, no-hard-tabs)


391-391: Hard tabs
Column: 1

(MD010, no-hard-tabs)


397-397: Hard tabs
Column: 1

(MD010, no-hard-tabs)


397-397: Hard tabs
Column: 6

(MD010, no-hard-tabs)


398-398: Hard tabs
Column: 1

(MD010, no-hard-tabs)


405-405: Hard tabs
Column: 1

(MD010, no-hard-tabs)


405-405: Hard tabs
Column: 6

(MD010, no-hard-tabs)


406-406: Hard tabs
Column: 1

(MD010, no-hard-tabs)

🔇 Additional comments (2)
adr/cosmo-streams-v1.md (1)

64-68: Clarify MutableStreamEvent.GetData() behavior relative to cloning.

The ADR states that StreamEvent.GetData() returns a copy (line 58 & 167), but doesn't explicitly document whether MutableStreamEvent.GetData() inherits this cloning behavior or behaves differently. Based on the design rationale in section "Immutable vs Mutable events," clarify this behavior to prevent confusion for module developers implementing these interfaces.

Consider adding a note like:

// MutableStreamEvent is a StreamEvent that can be modified.
type MutableStreamEvent interface {
    StreamEvent
    // SetData sets the payload data for this event
    SetData([]byte)
    // Note: GetData() is inherited from StreamEvent and returns a copy,
    // or specify actual behavior if it differs (e.g., returns internal slice for performance).
}

If MutableStreamEvent.GetData() intentionally returns an uncloned internal slice for performance (as indicated by implementation notes), the ADR should explicitly document this design choice alongside the rationale in section "Immutable vs Mutable events."

router/core/graphql_handler.go (1)

393-432: All error type definitions and methods are correctly implemented.

Verification confirms:

  • httpGraphqlError type has all required methods: Message(), ExtensionCode(), and StatusCode() at router/core/http_graphql_error.go:34-42
  • Error type constants errorTypeEDFSHookError and errorTypeHttpError are properly defined in router/core/errors.go:38-39
  • Error classification logic in router/core/errors.go correctly maps StreamHandlerErrorerrorTypeEDFSHookError (line 83-84) and httpGraphqlErrorerrorTypeHttpError (line 99-100)
  • Tests exercise StreamHandlerError paths in router-tests/modules/start_subscription_test.go

However, I could not find explicit tests for the cast failure defensive paths at lines 395 and 419. While the classification logic ensures casts should succeed in normal operation, verify that your test suite includes edge cases and comprehensive error scenario coverage to ensure the defensive error handling works as intended.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (2)
router-tests/modules/stream_publish_test.go (1)

31-38: Fix typo in explanatory comment

The comment has a small typo: “garantueed” → “guaranteed”.

-		// By default events are immutable in Cosmo Streams hooks, because it is not garantueed they aren't
+		// By default events are immutable in Cosmo Streams hooks, because it is not guaranteed they aren't
router-tests/modules/stream_receive_test.go (1)

208-217: Fix typo in immutability comment

Same typo as noted previously: “garantueed” → “guaranteed”.

-		// By default events are immutable in Cosmo Streams hooks, because it is not garantueed they aren't
+		// By default events are immutable in Cosmo Streams hooks, because it is not guaranteed they aren't
🧹 Nitpick comments (2)
router-tests/modules/stream-receive/module.go (2)

14-18: Consider making HookCallCount an embedded atomic.Int32 instead of a pointer

Using a *atomic.Int32 forces callers to manage allocation and nil checks, and it becomes easier to accidentally swap the pointer after the router starts, which can introduce data races. An embedded value keeps the counter always available and concurrency‑safe with a simpler call site.

Example refactor:

 type StreamReceiveModule struct {
 	Logger        *zap.Logger
 	Callback      func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error)
-	HookCallCount *atomic.Int32 // Counter to track how many times the hook is called
+	HookCallCount atomic.Int32 // Counter to track how many times the hook is called
 }

and in OnReceiveEvents:

-	if m.HookCallCount != nil {
-		m.HookCallCount.Add(1)
-	}
+	m.HookCallCount.Add(1)

Tests can still read the counter via m.HookCallCount.Load(), or take its address if they need a pointer.


27-41: Optionally prefer the request‑scoped logger from the handler context

The hook implementation looks good and aligns with the StreamReceiveEventHandler contract. For per‑subscription diagnostics, you might get richer logs (request ID, operation name, tenant, etc.) by logging via ctx.Logger() when available, and only falling back to the module logger:

-func (m *StreamReceiveModule) OnReceiveEvents(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
-	if m.Logger != nil {
-		m.Logger.Info("Stream Hook has been run")
-	}
+func (m *StreamReceiveModule) OnReceiveEvents(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
+	if logger := ctx.Logger(); logger != nil {
+		logger.Info("Stream Hook has been run")
+	} else if m.Logger != nil {
+		m.Logger.Info("Stream Hook has been run")
+	}

If you intentionally want only module‑level logging here, you can ignore this; the current behavior is still correct.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 65d5d55 and e1bf8cd.

📒 Files selected for processing (13)
  • router-tests/modules/start-subscription/module.go (1 hunks)
  • router-tests/modules/start_subscription_test.go (1 hunks)
  • router-tests/modules/stream-publish/module.go (1 hunks)
  • router-tests/modules/stream-receive/module.go (1 hunks)
  • router-tests/modules/stream_publish_test.go (1 hunks)
  • router-tests/modules/stream_receive_test.go (1 hunks)
  • router/core/router.go (3 hunks)
  • router/core/supervisor_instance.go (2 hunks)
  • router/pkg/config/config.go (1 hunks)
  • router/pkg/config/config.schema.json (1 hunks)
  • router/pkg/config/fixtures/full.yaml (1 hunks)
  • router/pkg/config/testdata/config_defaults.json (1 hunks)
  • router/pkg/config/testdata/config_full.json (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • router/core/supervisor_instance.go
  • router/pkg/config/config.schema.json
  • router-tests/modules/start_subscription_test.go
  • router-tests/modules/start-subscription/module.go
  • router/pkg/config/fixtures/full.yaml
  • router/pkg/config/testdata/config_full.json
  • router-tests/modules/stream-publish/module.go
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router/pkg/config/testdata/config_defaults.json
  • router-tests/modules/stream_publish_test.go
  • router/core/router.go
  • router-tests/modules/stream_receive_test.go
📚 Learning: 2025-09-17T20:55:39.456Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.

Applied to files:

  • router/pkg/config/testdata/config_defaults.json
  • router/core/router.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router-tests/modules/stream-receive/module.go
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router-tests/modules/stream_publish_test.go
  • router/core/router.go
  • router-tests/modules/stream_receive_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router-tests/modules/stream_receive_test.go
🧬 Code graph analysis (4)
router-tests/modules/stream-receive/module.go (3)
router/core/subscriptions_modules.go (2)
  • StreamReceiveEventHandlerContext (269-292)
  • StreamReceiveEventHandler (294-304)
router/pkg/pubsub/datasource/provider.go (1)
  • StreamEvents (61-63)
router/core/modules.go (3)
  • ModuleContext (160-164)
  • Module (52-54)
  • ModuleInfo (44-50)
router-tests/modules/stream_publish_test.go (9)
router-tests/testenv/testenv.go (5)
  • Run (105-122)
  • ConfigWithEdfsKafkaJSONTemplate (85-85)
  • Environment (1733-1769)
  • ConfigWithEdfsNatsJSONTemplate (87-87)
  • ConfigWithEdfsRedisJSONTemplate (89-89)
router-tests/modules/stream-publish/module.go (1)
  • PublishModule (14-18)
router/core/subscriptions_modules.go (1)
  • StreamPublishEventHandlerContext (306-327)
router/pkg/pubsub/datasource/provider.go (5)
  • StreamEvents (61-63)
  • MutableStreamEvent (94-98)
  • PublishEventConfiguration (108-112)
  • StreamEvent (86-91)
  • NewStreamEvents (81-83)
router/core/router.go (3)
  • Option (172-172)
  • WithModulesConfig (1679-1683)
  • WithCustomModules (1786-1790)
router-tests/events/utils.go (3)
  • KafkaEnsureTopicExists (15-32)
  • ReadKafkaMessages (55-71)
  • ReadRedisMessages (96-121)
router/pkg/pubsub/kafka/engine_datasource.go (2)
  • PublishEventConfiguration (188-192)
  • MutableEvent (62-66)
router/pkg/pubsub/redis/engine_datasource.go (2)
  • PublishEventConfiguration (141-145)
  • MutableEvent (35-37)
router/pkg/pubsub/nats/engine_datasource.go (1)
  • MutableEvent (42-45)
router/core/router.go (2)
router/core/subscriptions_modules.go (3)
  • SubscriptionOnStartHandler (202-206)
  • StreamPublishEventHandler (329-336)
  • StreamReceiveEventHandler (294-304)
router/pkg/config/config.go (1)
  • StreamsHandlerConfiguration (648-650)
router-tests/modules/stream_receive_test.go (8)
router-tests/testenv/testenv.go (2)
  • Run (105-122)
  • Environment (1733-1769)
router-tests/modules/stream-receive/module.go (1)
  • StreamReceiveModule (14-18)
router/pkg/config/config.go (4)
  • Config (1017-1092)
  • Graph (24-29)
  • StreamsHandlerConfiguration (648-650)
  • OnReceiveEventsConfiguration (652-655)
router/core/router.go (2)
  • Option (172-172)
  • WithStreamsHandlerConfiguration (2143-2148)
router-tests/events/utils.go (2)
  • KafkaEnsureTopicExists (15-32)
  • ProduceKafkaMessage (34-53)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
router/core/subscriptions_modules.go (1)
  • StreamReceiveEventHandlerContext (269-292)
router/pkg/pubsub/datasource/provider.go (4)
  • StreamEvents (61-63)
  • StreamEvent (86-91)
  • NewStreamEvents (81-83)
  • MutableStreamEvent (94-98)
🔇 Additional comments (7)
router/pkg/config/testdata/config_defaults.json (1)

293-305: Defaults for events.handlers.on_receive_events look consistent

The JSON defaults (MaxConcurrentHandlers: 100, HandlerTimeout: 5000000000) match the Go defaults in OnReceiveEventsConfiguration and router.NewRouter, so the config surface is coherent for both YAML/env and JSON-based tests.

router/core/router.go (3)

256-262: Subscription receive‑hook defaults align with config; semantics are “zero means default”

The defaulting of maxConcurrentHandlers to 100 and timeout to 5*time.Second when zero matches OnReceiveEventsConfiguration’s envDefaults and avoids accidentally running with unbounded/zero values when options are omitted or constructed programmatically.


686-696: Module stream hooks are wired cleanly into subscriptionHooks

Registering SubscriptionOnStartHandler, StreamPublishEventHandler, and StreamReceiveEventHandler here mirrors the existing module wiring (middleware, pre/post origin, trace propagators) and cleanly centralizes all stream‑related hooks under subscriptionHooks.


2143-2148: WithStreamsHandlerConfiguration correctly keeps timeout/concurrency construction‑time only

This option writes into r.subscriptionHooks.onReceiveEvents before the updater is created, keeping timeout/semaphore configuration construction‑time only and leaving SetHooks to replace handlers only, as intended. Also consistent with the “zero means default” behavior enforced in NewRouter.

router/pkg/config/config.go (1)

637-655: Streams handler config is well‑shaped and matches router expectations

EventsConfiguration.Handlers.OnReceiveEvents with MaxConcurrentHandlers and HandlerTimeout (envDefault 100 / 5s) lines up with the router’s WithStreamsHandlerConfiguration and subscriptionHooks defaults, giving a single, consistent configuration surface for receive‑hook concurrency/timeout.

router-tests/modules/stream_receive_test.go (2)

842-967: Timeout test for out‑of‑order delivery is well‑designed

The combination of hookDelay > hookTimeout, three produced events, collection of IDs, and explicit assertions about the last/first positions plus the timeout log message provides strong coverage of the timeout semantics and the documented “may arrive out of order” trade‑off.


663-688: Fix invalid for loop over tc.numSubscribers in concurrency test

for i := range tc.numSubscribers is not valid Go (you can’t range over an int). This will prevent the tests from compiling. It should be a standard counted loop:

-					for i := range tc.numSubscribers {
+					for i := 0; i < tc.numSubscribers; i++ {
 						clients[i] = graphql.NewSubscriptionClient(surl)
 						clientRunChs[i] = make(chan error)
 						subscriptionArgsChs[i] = make(chan kafkaSubscriptionArgs, 1)
@@
-						go func(i int) {
-							clientRunChs[i] <- clients[i].Run()
-						}(i)
+						go func(i int) {
+							clientRunChs[i] <- clients[i].Run()
+						}(i)
 					}

With this fix, the test will correctly spin up the expected number of clients and exercise the MaxConcurrentHandlers behavior.

Also applies to: 789-808

⛔ Skipped due to learnings
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2216
File: router-tests/authentication_test.go:847-847
Timestamp: 2025-09-17T13:51:50.823Z
Learning: Go 1.22 introduced the ability to range over integers, making `for range n` syntax valid for iterating n times in loops.

Removed a switch case, which is not reachable anymore.
Also removed an early return in case a type assertion did not work.
This way we ensure the response is actually sent either way.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
router-tests/modules/start_subscription_test.go (1)

154-154: Buffer all callback coordination channels to prevent deadlocks.

Multiple test helper channels are created unbuffered, creating race conditions where subscription callbacks can block if they attempt to send before the test goroutine reaches the corresponding AwaitChannelWithT receive:

  • Line 154: subscriptionArgsCh in "Test StartSubscription write event works"
  • Line 194: callbackCalled in "Test StartSubscription closes client connection when hook returns an error"
  • Lines 341, 352: subscriptionOneArgsCh and subscriptionTwoArgsCh in "Test event emitted byStartSubscription sends event only to the client that triggered the hook"
  • Line 441: subscriptionOneArgsCh in "Test StartSubscription error is propagated to the client"
  • Line 593: subscriptionOneArgsCh in "Test StartSubscription hook is called for engine subscription and write event works"
  • Line 684: subscriptionOneArgsCh in "Test when StartSubscription hook returns an error, the OnOriginResponse hook is not called"

The timing window between when client.Run() activates subscriptions and when the test reaches AwaitChannelWithT is non-deterministic. If a callback fires during this gap and attempts to send to an unbuffered channel, it blocks the router goroutine indefinitely, causing test hangs.

Note: Past review comments for lines 194 and 341 were marked "Fixed", but the code still shows unbuffered channels.

Apply buffering to all callback coordination channels:

 // Test 2, line 154:
-subscriptionArgsCh := make(chan kafkaSubscriptionArgs)
+subscriptionArgsCh := make(chan kafkaSubscriptionArgs, 1)

 // Test 3, line 194:
-callbackCalled := make(chan bool)
+callbackCalled := make(chan bool, 1)

 // Test 4, lines 341 and 352:
-subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs)
+subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 1)
 ...
-subscriptionTwoArgsCh := make(chan kafkaSubscriptionArgs)
+subscriptionTwoArgsCh := make(chan kafkaSubscriptionArgs, 1)

 // Test 5, line 441:
-subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs)
+subscriptionOneArgsCh := make(chan kafkaSubscriptionArgs, 1)

 // Test 7, line 593:
-subscriptionOneArgsCh := make(chan subscriptionArgs)
+subscriptionOneArgsCh := make(chan subscriptionArgs, 1)

 // Test 8, line 684:
-subscriptionOneArgsCh := make(chan subscriptionArgs)
+subscriptionOneArgsCh := make(chan subscriptionArgs, 1)

Buffering makes the require.Len(..., 0) cleanup assertions at lines 383 and 475 meaningful—they can now detect unexpected callback invocations.

Also applies to: 194-194, 341-341, 352-352, 441-441, 593-593, 684-684

🧹 Nitpick comments (1)
router-tests/modules/start_subscription_test.go (1)

279-279: Fix typo in test name.

"byStartSubscription" should be "by StartSubscription" (add space before "StartSubscription").

-t.Run("Test event emitted byStartSubscription sends event only to the client that triggered the hook", func(t *testing.T) {
+t.Run("Test event emitted by StartSubscription sends event only to the client that triggered the hook", func(t *testing.T) {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 544bf72 and 1cabe5d.

📒 Files selected for processing (1)
  • router-tests/modules/start_subscription_test.go (1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-09-17T20:55:39.456Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.

Applied to files:

  • router-tests/modules/start_subscription_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router-tests/modules/start_subscription_test.go
🧬 Code graph analysis (1)
router-tests/modules/start_subscription_test.go (6)
router-tests/testenv/testenv.go (4)
  • Run (105-122)
  • ConfigWithEdfsKafkaJSONTemplate (85-85)
  • LogObservationConfig (388-391)
  • Environment (1733-1769)
router-tests/modules/start-subscription/module.go (1)
  • StartSubscriptionModule (14-19)
router/core/router.go (3)
  • Option (172-172)
  • WithModulesConfig (1679-1683)
  • WithCustomModules (1786-1790)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
router/core/subscriptions_modules.go (2)
  • SubscriptionOnStartHandlerContext (15-53)
  • StreamHandlerError (448-451)
router/pkg/pubsub/kafka/engine_datasource.go (2)
  • SubscriptionEventConfiguration (91-95)
  • MutableEvent (62-66)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (5)
router-tests/modules/stream_publish_test.go (1)

91-93: Update comment to match actual assertion.

The comment mentions verifying the hook via a log message, but the test actually checks HookCallCount. Additionally, line 92 has a grammatical error: "as been" should be "has been".

Apply this diff:

-		// This test verifies that the publish hook is invoked when a mutation with a Kafka publish is executed.
-		// It confirms the hook as been called by checking a log message, which is written by the custom module
-		// used in these tests right before the actual hook is being called.
+		// This test verifies that the publish hook is invoked when a mutation with a Kafka publish is executed.
+		// It confirms the hook has been called by asserting HookCallCount on the custom module.
router-tests/modules/stream_receive_test.go (4)

212-213: Fix typo in comment explaining immutability

The comment should say “guaranteed” instead of “garantueed” to keep the documentation clear:

-		// By default events are immutable in Cosmo Streams hooks, because it is not garantueed they aren't
+		// By default events are immutable in Cosmo Streams hooks, because it is not guaranteed they aren't

381-390: Avoid sharing one subscription struct between two clients (data race)

Both client and client2 subscribe using &subscriptionQuery, so two independent subscription goroutines can concurrently unmarshal into the same target struct, which is a data race (even if this test only inspects the raw JSON bytes).

Give each client its own target value:

-			var subscriptionQuery struct {
+			var subscriptionQuery struct {
 				employeeUpdatedMyKafka struct {
 					ID      float64 `graphql:"id"`
 					Details struct {
 						Forename string `graphql:"forename"`
 						Surname  string `graphql:"surname"`
 					} `graphql:"details"`
 				} `graphql:"employeeUpdatedMyKafka(employeeID: 3)"`
 			}
+			var subscriptionQueryAuth struct {
+				employeeUpdatedMyKafka struct {
+					ID      float64 `graphql:"id"`
+					Details struct {
+						Forename string `graphql:"forename"`
+						Surname  string `graphql:"surname"`
+					} `graphql:"details"`
+				} `graphql:"employeeUpdatedMyKafka(employeeID: 3)"`
+			}
@@
-			subscriptionOneID, err := client.Subscribe(&subscriptionQuery, nil, func(dataValue []byte, errValue error) error {
+			subscriptionOneID, err := client.Subscribe(&subscriptionQuery, nil, func(dataValue []byte, errValue error) error {
@@
-			subscriptionTwoID, err := client2.Subscribe(&subscriptionQuery, nil, func(dataValue []byte, errValue error) error {
+			subscriptionTwoID, err := client2.Subscribe(&subscriptionQueryAuth, nil, func(dataValue []byte, errValue error) error {

Also applies to: 423-433


623-630: Unbuffered subscriptionArgsCh is never read and can block callback

In the batch-error test, subscriptionArgsCh is unbuffered and written from the subscription callback but never read in the test, so any callback invocation would block and potentially hang client.Run().

Either remove the channel entirely and use a no-op callback, or make it buffered and non-blocking. For minimal change:

-			subscriptionArgsCh := make(chan kafkaSubscriptionArgs)
-			subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error {
-				subscriptionArgsCh <- kafkaSubscriptionArgs{
-					dataValue: dataValue,
-					errValue:  errValue,
-				}
-				return nil
-			})
+			subscriptionArgsCh := make(chan kafkaSubscriptionArgs, 1) // best-effort capture if callback fires
+			subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error {
+				select {
+				case subscriptionArgsCh <- kafkaSubscriptionArgs{dataValue: dataValue, errValue: errValue}:
+				default:
+					// channel not observed in this test; drop to avoid blocking
+				}
+				return nil
+			})

(Or, if you don’t care about the callback here, just replace it with a no-op that ignores dataValue/errValue and drop the channel entirely.)


773-781: Fix invalid range on int and per-client subscription struct data race

In the concurrency test:

  1. for i := range tc.numSubscribers does not compile in Go (you can’t range over an int).
  2. All clients subscribe with the same subscriptionQuery instance, so multiple goroutines can concurrently decode into the same struct, causing a data race.

Define a per-client subscription struct slice and iterate over the clients slice:

-					var subscriptionQuery struct {
-						employeeUpdatedMyKafka struct {
-							ID      float64 `graphql:"id"`
-							Details struct {
-								Forename string `graphql:"forename"`
-								Surname  string `graphql:"surname"`
-							} `graphql:"details"`
-						} `graphql:"employeeUpdatedMyKafka(employeeID: 3)"`
-					}
-
-					surl := xEnv.GraphQLWebSocketSubscriptionURL()
-					client := graphql.NewSubscriptionClient(surl)
-
-					clients := make([]*graphql.SubscriptionClient, tc.numSubscribers)
-					clientRunChs := make([]chan error, tc.numSubscribers)
-					subscriptionArgsChs := make([]chan kafkaSubscriptionArgs, tc.numSubscribers)
-
-					for i := range tc.numSubscribers {
+					type subscriptionQuery struct {
+						employeeUpdatedMyKafka struct {
+							ID      float64 `graphql:"id"`
+							Details struct {
+								Forename string `graphql:"forename"`
+								Surname  string `graphql:"surname"`
+							} `graphql:"details"`
+						} `graphql:"employeeUpdatedMyKafka(employeeID: 3)"`
+					}
+
+					surl := xEnv.GraphQLWebSocketSubscriptionURL()
+
+					clients := make([]*graphql.SubscriptionClient, tc.numSubscribers)
+					clientRunChs := make([]chan error, tc.numSubscribers)
+					subscriptionArgsChs := make([]chan kafkaSubscriptionArgs, tc.numSubscribers)
+					subscriptions := make([]subscriptionQuery, tc.numSubscribers)
+
+					for i := range clients {
 						clients[i] = graphql.NewSubscriptionClient(surl)
 						clientRunChs[i] = make(chan error)
 						subscriptionArgsChs[i] = make(chan kafkaSubscriptionArgs, 1)
@@
-						subscriptionID, err := clients[i].Subscribe(&subscriptionQuery, nil, func(dataValue []byte, errValue error) error {
+						subscriptionID, err := clients[i].Subscribe(&subscriptions[idx], nil, func(dataValue []byte, errValue error) error {

This both fixes compilation and ensures each client has its own stable decode target.

Also applies to: 785-808

🧹 Nitpick comments (1)
router-tests/modules/stream_publish_test.go (1)

134-137: Improve comment clarity.

The comment phrasing on lines 136-137 could be clearer. Consider rephrasing for better readability.

Apply this diff:

-		// This test verifies that when the publish hook returns events and an error
-		// but the messages are still sent to Kafka.
+		// This test verifies that when the publish hook returns both events and an error,
+		// the messages are still sent to Kafka.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1cabe5d and d2946a1.

📒 Files selected for processing (2)
  • router-tests/modules/stream_publish_test.go (1 hunks)
  • router-tests/modules/stream_receive_test.go (1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router-tests/modules/stream_receive_test.go
  • router-tests/modules/stream_publish_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router-tests/modules/stream_receive_test.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router-tests/modules/stream_receive_test.go
  • router-tests/modules/stream_publish_test.go
📚 Learning: 2025-11-16T11:52:34.064Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/pkg/pubsub/redis/engine_datasource.go:37-42
Timestamp: 2025-11-16T11:52:34.064Z
Learning: In router/pkg/pubsub/redis/engine_datasource.go (and similar files for kafka/nats), the MutableEvent.GetData() method intentionally returns the internal Data slice without cloning, while Event.GetData() clones the slice. This is by design: MutableEvent is designed to be mutable (so callers can modify the data), while Event is immutable (cloning prevents modification). This difference is part of the Cosmo Streams v1 hook interface design.

Applied to files:

  • router-tests/modules/stream_receive_test.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router-tests/modules/stream_receive_test.go
  • router-tests/modules/stream_publish_test.go
🧬 Code graph analysis (2)
router-tests/modules/stream_receive_test.go (10)
router-tests/testenv/testenv.go (3)
  • Run (105-122)
  • LogObservationConfig (388-391)
  • Environment (1733-1769)
router-tests/modules/stream-receive/module.go (1)
  • StreamReceiveModule (14-18)
router/pkg/config/config.go (4)
  • Config (1017-1092)
  • Graph (24-29)
  • StreamsHandlerConfiguration (648-650)
  • OnReceiveEventsConfiguration (652-655)
router/core/router.go (5)
  • Option (172-172)
  • WithModulesConfig (1679-1683)
  • WithCustomModules (1786-1790)
  • WithAccessController (1839-1843)
  • WithStreamsHandlerConfiguration (2143-2148)
router-tests/events/utils.go (2)
  • KafkaEnsureTopicExists (15-32)
  • ProduceKafkaMessage (34-53)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
router/core/subscriptions_modules.go (1)
  • StreamReceiveEventHandlerContext (269-292)
router/pkg/pubsub/datasource/provider.go (4)
  • StreamEvents (61-63)
  • StreamEvent (86-91)
  • NewStreamEvents (81-83)
  • MutableStreamEvent (94-98)
router/pkg/authentication/jwks_token_decoder.go (3)
  • NewJwksTokenDecoder (78-241)
  • JWKSConfig (44-56)
  • TokenDecoder (21-23)
router/pkg/authentication/http_header_authenticator.go (2)
  • HttpHeaderAuthenticatorOptions (81-89)
  • NewHttpHeaderAuthenticator (93-113)
router-tests/modules/stream_publish_test.go (10)
router-tests/testenv/testenv.go (2)
  • Run (105-122)
  • Environment (1733-1769)
router-tests/modules/stream-publish/module.go (1)
  • PublishModule (14-18)
router/core/subscriptions_modules.go (1)
  • StreamPublishEventHandlerContext (306-327)
router/pkg/pubsub/datasource/provider.go (5)
  • StreamEvents (61-63)
  • MutableStreamEvent (94-98)
  • PublishEventConfiguration (108-112)
  • StreamEvent (86-91)
  • NewStreamEvents (81-83)
router/core/router.go (3)
  • Option (172-172)
  • WithModulesConfig (1679-1683)
  • WithCustomModules (1786-1790)
router-tests/events/utils.go (3)
  • KafkaEnsureTopicExists (15-32)
  • ReadKafkaMessages (55-71)
  • ReadRedisMessages (96-121)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
router/pkg/pubsub/kafka/engine_datasource.go (2)
  • PublishEventConfiguration (188-192)
  • MutableEvent (62-66)
router/pkg/pubsub/redis/engine_datasource.go (2)
  • PublishEventConfiguration (141-145)
  • MutableEvent (35-37)
router/pkg/pubsub/nats/engine_datasource.go (1)
  • MutableEvent (42-45)
🔇 Additional comments (6)
router-tests/modules/stream_publish_test.go (2)

282-285: LGTM: Proper channel handling with timeout.

The use of testenv.AwaitChannelWithT correctly addresses the race condition that would occur with require.Len on a channel. This ensures the test waits for the message to arrive or times out gracefully.


289-360: LGTM: Comprehensive test of Kafka-specific mutations.

This test effectively demonstrates the publish hook's ability to:

  • Clone events to obtain mutable copies
  • Access operation variables from the context
  • Modify event data
  • Add broker-specific features (Kafka headers)

The test logic is correct and provides good coverage of the Kafka publish hook functionality.

router-tests/modules/stream_receive_test.go (4)

37-115: Subscription happy-path wiring looks correct

Test setup (Kafka topic provisioning, single WS client, subscription, event production, and shutdown) is coherent and exercises the receive hook path end-to-end; the HookCallCount assertion gives a clear signal that the hook is actually invoked.


117-206: Event cloning & mutation pattern is correct

The hook’s use of event.Clone() followed by SetData and wrapping in datasource.NewStreamEvents cleanly validates the intended “immutable by default, mutable via clone” contract, and the assertions confirm only the transformed payload reaches the client.


471-572: Header-based hook condition is well-structured

Using http.CanonicalHeaderKey and reading ctx.Request().Header[customHeader] to gate event mutation is a clean way to validate header propagation into the hook context; the test flow (single client with custom header, one Kafka event, JSON assertion, hook-count check) is tight and clear.


842-968: Out-of-order delivery & timeout behavior is well exercised

The last test nicely validates the timeout behavior: delaying only the first hook call, collecting IDs via a buffered channel and AwaitChannelWithT, asserting that all three IDs arrive plus that ID 1 arrives last, and checking for the specific timeout log message and hook call count. This gives good confidence in the trade-off semantics where timeouts can reorder events.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
router/core/errors.go (1)

38-38: Consider renaming for consistency with Cosmo Streams v1.

The constant uses errorTypeEDFSHookError naming while this PR implements Cosmo Streams v1 and checks for StreamHandlerError. Since the PR objectives indicate this is rebranding EDFS to Cosmo Streams, consider errorTypeStreamHookError for consistency.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d2946a1 and cdcbf93.

📒 Files selected for processing (2)
  • router-tests/modules/stream_receive_test.go (1 hunks)
  • router/core/errors.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • router-tests/modules/stream_receive_test.go
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
🧬 Code graph analysis (1)
router/core/errors.go (1)
router/core/subscriptions_modules.go (1)
  • StreamHandlerError (448-451)
🔇 Additional comments (1)
router/core/errors.go (1)

81-84: No action required—StreamHandlerError correctly implements the error interface.

Verification confirms that StreamHandlerError has the required Error() string method (router/core/subscriptions_modules.go:454-456), so the errors.As check will work as intended and properly classify hook errors.

This actually introduces a bug, where occasionally subscription clients will get disconnected after receiving messages when we deal with lots of connecting and disconnecting clients. It depends on two contexts being regularly checked inside the Kafka topic poller and if one of them cancels, the connections would get closed. These contexts are meant to handle the lifecycle of the Kafka adapter, not subscriptions. So we should not close the subscription updater when the poller is canceled.

After this commit it behaves like the router behaves before Cosmo Streams. All of this can be reevaluated, maybe there is room for improvement. But for now its better not to change this bevaviour.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
adr/cosmo-streams-v1.md (5)

23-26: Clarify language and naming terminology.

Several lines have minor wording issues that could be tightened:

  • Line 24: "prior to delivery" → suggest "before delivery" (more concise)
  • Line 25: "going to be sent" → suggest "will be sent" (more direct)
- `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery.
- `StreamPublishEventHandler`: Called each time a batch of events is going to be sent to the provider.
+ `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events are received from the provider, before delivery.
+ `StreamPublishEventHandler`: Called each time a batch of events will be sent to the provider.

219-219: Apply hyphenation to compound adjectives.

Two instances where compound adjectives should be hyphenated for clarity:

  • Line 219: "to/from Federation compatible" → "Federation-compatible"
  • Line 225: "fully backwards compatible" → "fully backward-compatible"
- **Data mapping**: Transforming events data from the format that could be used by the external system to/from Federation compatible Router events
+ **Data mapping**: Transforming events data from the format that could be used by the external system to/from Federation-compatible Router events
- The new hooks can be integrated in the router in a fully backwards compatible way.
+ The new hooks can be integrated in the router in a fully backward-compatible way.

Also applies to: 225-225


235-235: Correct emphasis and heading formatting.

Line 235 uses underscores (__text__) for strong emphasis instead of asterisks (**text**), and treats strong text as a heading when it should be a proper Markdown heading (using ##).

- __All examples reflect the current implementation and match the actual API__
+ ## All examples reflect the current implementation and match the actual API

289-292: Replace hard tabs with spaces in example code blocks.

Additional hard tabs detected in the Go code examples (lines 289–292, 294, 428–431, 437–438, 445–446). Replace with consistent space indentation.

Also applies to: 294-294, 428-431, 437-438, 445-446


87-189: Replace hard tabs with spaces in interface definitions (lines 87–189).

Hard tabs detected in all interface method definitions and comments. Replace with spaces for consistency and portability across different editors and tools.

router/pkg/pubsub/kafka/adapter.go (1)

126-180: Consider explicitly closing the per-subscription Kafka consumer client

In Subscribe, a new kgo.Client is created per subscription and used only inside the poller goroutine. When topicPoller returns (either on shutdown or error), the goroutine just exits; the consumer client is never explicitly closed here.

To avoid potential lingering background goroutines or connections in the franz-go client, consider closing it when the poller ends:

go func() {

-   defer p.closeWg.Done()
+   defer p.closeWg.Done()
+   defer client.Close()

    err := p.topicPoller(ctx, client, updater, PollerOpts{providerId: conf.ProviderID()})
    if err != nil {
        // ...
    }
}()

If the consumer lifecycle is already managed elsewhere, this can be skipped, but as written this function appears to be the only owner of client.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cdcbf93 and 551ef84.

📒 Files selected for processing (2)
  • adr/cosmo-streams-v1.md (1 hunks)
  • router/pkg/pubsub/kafka/adapter.go (5 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-11-16T11:52:34.064Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/pkg/pubsub/redis/engine_datasource.go:37-42
Timestamp: 2025-11-16T11:52:34.064Z
Learning: In router/pkg/pubsub/redis/engine_datasource.go (and similar files for kafka/nats), the MutableEvent.GetData() method intentionally returns the internal Data slice without cloning, while Event.GetData() clones the slice. This is by design: MutableEvent is designed to be mutable (so callers can modify the data), while Event is immutable (cloning prevents modification). This difference is part of the Cosmo Streams v1 hook interface design.

Applied to files:

  • router/pkg/pubsub/kafka/adapter.go
  • adr/cosmo-streams-v1.md
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router/pkg/pubsub/kafka/adapter.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • adr/cosmo-streams-v1.md
🧬 Code graph analysis (1)
router/pkg/pubsub/kafka/adapter.go (5)
router/pkg/pubsub/datasource/provider.go (6)
  • Adapter (24-28)
  • ProviderType (52-52)
  • ProviderTypeKafka (56-56)
  • StreamEvent (86-91)
  • SubscriptionEventConfiguration (101-105)
  • PublishEventConfiguration (108-112)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (21-26)
router/pkg/metric/stream_metric_store.go (3)
  • StreamsEvent (25-31)
  • ProviderType (16-16)
  • ProviderTypeKafka (19-19)
router/pkg/pubsub/kafka/engine_datasource.go (4)
  • Event (21-23)
  • MutableEvent (62-66)
  • SubscriptionEventConfiguration (91-95)
  • PublishEventConfiguration (188-192)
router/pkg/pubsub/datasource/error.go (2)
  • NewError (12-17)
  • Error (3-6)
🪛 LanguageTool
adr/cosmo-streams-v1.md

[style] ~24-~24: ‘prior to’ might be wordy. Consider a shorter alternative.
Context: ...f events is received from the provider, prior to delivery. - StreamPublishEventHandler...

(EN_WORDINESS_PREMIUM_PRIOR_TO)


[style] ~25-~25: Use ‘will’ instead of ‘going to’ if the following action is certain.
Context: ...er`: Called each time a batch of events is going to be sent to the provider. ```go // STRU...

(GOING_TO_WILL)


[grammar] ~219-~219: Use a hyphen to join words.
Context: ...y the external system to/from Federation compatible Router events - **Event filte...

(QB_NEW_EN_HYPHEN)


[uncategorized] ~225-~225: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ... be integrated in the router in a fully backwards compatible way. When the new module system will b...

(EN_COMPOUND_ADJECTIVE_INTERNAL)

🪛 markdownlint-cli2 (0.18.1)
adr/cosmo-streams-v1.md

87-87: Hard tabs
Column: 1

(MD010, no-hard-tabs)


88-88: Hard tabs
Column: 1

(MD010, no-hard-tabs)


89-89: Hard tabs
Column: 1

(MD010, no-hard-tabs)


90-90: Hard tabs
Column: 1

(MD010, no-hard-tabs)


91-91: Hard tabs
Column: 1

(MD010, no-hard-tabs)


92-92: Hard tabs
Column: 1

(MD010, no-hard-tabs)


93-93: Hard tabs
Column: 1

(MD010, no-hard-tabs)


94-94: Hard tabs
Column: 1

(MD010, no-hard-tabs)


95-95: Hard tabs
Column: 1

(MD010, no-hard-tabs)


96-96: Hard tabs
Column: 1

(MD010, no-hard-tabs)


97-97: Hard tabs
Column: 1

(MD010, no-hard-tabs)


98-98: Hard tabs
Column: 1

(MD010, no-hard-tabs)


99-99: Hard tabs
Column: 1

(MD010, no-hard-tabs)


100-100: Hard tabs
Column: 1

(MD010, no-hard-tabs)


101-101: Hard tabs
Column: 1

(MD010, no-hard-tabs)


102-102: Hard tabs
Column: 1

(MD010, no-hard-tabs)


103-103: Hard tabs
Column: 1

(MD010, no-hard-tabs)


104-104: Hard tabs
Column: 1

(MD010, no-hard-tabs)


105-105: Hard tabs
Column: 1

(MD010, no-hard-tabs)


106-106: Hard tabs
Column: 1

(MD010, no-hard-tabs)


107-107: Hard tabs
Column: 1

(MD010, no-hard-tabs)


108-108: Hard tabs
Column: 1

(MD010, no-hard-tabs)


109-109: Hard tabs
Column: 1

(MD010, no-hard-tabs)


110-110: Hard tabs
Column: 1

(MD010, no-hard-tabs)


111-111: Hard tabs
Column: 1

(MD010, no-hard-tabs)


112-112: Hard tabs
Column: 1

(MD010, no-hard-tabs)


113-113: Hard tabs
Column: 1

(MD010, no-hard-tabs)


114-114: Hard tabs
Column: 1

(MD010, no-hard-tabs)


115-115: Hard tabs
Column: 1

(MD010, no-hard-tabs)


116-116: Hard tabs
Column: 1

(MD010, no-hard-tabs)


117-117: Hard tabs
Column: 1

(MD010, no-hard-tabs)


118-118: Hard tabs
Column: 1

(MD010, no-hard-tabs)


119-119: Hard tabs
Column: 1

(MD010, no-hard-tabs)


120-120: Hard tabs
Column: 1

(MD010, no-hard-tabs)


121-121: Hard tabs
Column: 1

(MD010, no-hard-tabs)


122-122: Hard tabs
Column: 1

(MD010, no-hard-tabs)


123-123: Hard tabs
Column: 1

(MD010, no-hard-tabs)


133-133: Hard tabs
Column: 1

(MD010, no-hard-tabs)


134-134: Hard tabs
Column: 1

(MD010, no-hard-tabs)


135-135: Hard tabs
Column: 1

(MD010, no-hard-tabs)


136-136: Hard tabs
Column: 1

(MD010, no-hard-tabs)


137-137: Hard tabs
Column: 1

(MD010, no-hard-tabs)


138-138: Hard tabs
Column: 1

(MD010, no-hard-tabs)


139-139: Hard tabs
Column: 1

(MD010, no-hard-tabs)


140-140: Hard tabs
Column: 1

(MD010, no-hard-tabs)


141-141: Hard tabs
Column: 1

(MD010, no-hard-tabs)


142-142: Hard tabs
Column: 1

(MD010, no-hard-tabs)


143-143: Hard tabs
Column: 1

(MD010, no-hard-tabs)


144-144: Hard tabs
Column: 1

(MD010, no-hard-tabs)


145-145: Hard tabs
Column: 1

(MD010, no-hard-tabs)


146-146: Hard tabs
Column: 1

(MD010, no-hard-tabs)


147-147: Hard tabs
Column: 1

(MD010, no-hard-tabs)


148-148: Hard tabs
Column: 1

(MD010, no-hard-tabs)


149-149: Hard tabs
Column: 1

(MD010, no-hard-tabs)


150-150: Hard tabs
Column: 1

(MD010, no-hard-tabs)


151-151: Hard tabs
Column: 1

(MD010, no-hard-tabs)


152-152: Hard tabs
Column: 1

(MD010, no-hard-tabs)


153-153: Hard tabs
Column: 1

(MD010, no-hard-tabs)


154-154: Hard tabs
Column: 1

(MD010, no-hard-tabs)


170-170: Hard tabs
Column: 1

(MD010, no-hard-tabs)


171-171: Hard tabs
Column: 1

(MD010, no-hard-tabs)


172-172: Hard tabs
Column: 1

(MD010, no-hard-tabs)


173-173: Hard tabs
Column: 1

(MD010, no-hard-tabs)


174-174: Hard tabs
Column: 1

(MD010, no-hard-tabs)


175-175: Hard tabs
Column: 1

(MD010, no-hard-tabs)


176-176: Hard tabs
Column: 1

(MD010, no-hard-tabs)


177-177: Hard tabs
Column: 1

(MD010, no-hard-tabs)


178-178: Hard tabs
Column: 1

(MD010, no-hard-tabs)


179-179: Hard tabs
Column: 1

(MD010, no-hard-tabs)


180-180: Hard tabs
Column: 1

(MD010, no-hard-tabs)


181-181: Hard tabs
Column: 1

(MD010, no-hard-tabs)


182-182: Hard tabs
Column: 1

(MD010, no-hard-tabs)


183-183: Hard tabs
Column: 1

(MD010, no-hard-tabs)


184-184: Hard tabs
Column: 1

(MD010, no-hard-tabs)


185-185: Hard tabs
Column: 1

(MD010, no-hard-tabs)


186-186: Hard tabs
Column: 1

(MD010, no-hard-tabs)


187-187: Hard tabs
Column: 1

(MD010, no-hard-tabs)


188-188: Hard tabs
Column: 1

(MD010, no-hard-tabs)


189-189: Hard tabs
Column: 1

(MD010, no-hard-tabs)


235-235: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


235-235: Strong style
Expected: asterisk; Actual: underscore

(MD050, strong-style)


235-235: Strong style
Expected: asterisk; Actual: underscore

(MD050, strong-style)


289-289: Hard tabs
Column: 1

(MD010, no-hard-tabs)


290-290: Hard tabs
Column: 1

(MD010, no-hard-tabs)


291-291: Hard tabs
Column: 1

(MD010, no-hard-tabs)


292-292: Hard tabs
Column: 1

(MD010, no-hard-tabs)


294-294: Hard tabs
Column: 1

(MD010, no-hard-tabs)


428-428: Hard tabs
Column: 1

(MD010, no-hard-tabs)


429-429: Hard tabs
Column: 1

(MD010, no-hard-tabs)


430-430: Hard tabs
Column: 1

(MD010, no-hard-tabs)


431-431: Hard tabs
Column: 1

(MD010, no-hard-tabs)


437-437: Hard tabs
Column: 1

(MD010, no-hard-tabs)


437-437: Hard tabs
Column: 6

(MD010, no-hard-tabs)


438-438: Hard tabs
Column: 1

(MD010, no-hard-tabs)


445-445: Hard tabs
Column: 1

(MD010, no-hard-tabs)


445-445: Hard tabs
Column: 6

(MD010, no-hard-tabs)


446-446: Hard tabs
Column: 1

(MD010, no-hard-tabs)

🔇 Additional comments (3)
adr/cosmo-streams-v1.md (1)

15-232: ADR structure and content are sound.

The document clearly articulates the design rationale, interface contracts, and immutable-vs-mutable event trade-offs. The EmitLocalEvent terminology reflects the prior rename from WriteEvent, and examples demonstrate realistic module implementation patterns (authorization, filtering, data transformation). No substantive documentation gaps identified.

router/pkg/pubsub/kafka/adapter.go (2)

23-24: Adapter compile-time assertion is correct

The var _ datasource.Adapter = (*ProviderAdapter)(nil) check is a good safeguard to keep ProviderAdapter aligned with the public datasource.Adapter surface as it evolves.


51-118: topicPoller event construction and metrics wiring look consistent

The changes to topicPoller to:

  • use datasource.SubscriptionEventUpdater,
  • extract Kafka headers into a map[string][]byte,
  • wrap records as *Event{evt: &MutableEvent{...}}, and
  • emit Consume metrics with providerId from PollerOpts

are consistent with the Streams v1 StreamEvent / MutableStreamEvent model and preserve key, value, and headers as expected. No issues from a correctness or API-usage perspective.

Based on learnings

@dkorittki
Copy link
Contributor Author

dkorittki commented Nov 20, 2025

Waiting for Engine PR to get merged and new engine version to be released. Then I pull latest changes from main, update router go.mod, then its ready for merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants