feat(router): add SubscriptionOnCreate hook#2972
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds a ChangesSubscription On-Create Hook
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2972 +/- ##
===========================================
+ Coverage 46.83% 66.29% +19.45%
===========================================
Files 1115 258 -857
Lines 151058 27640 -123418
Branches 9883 0 -9883
===========================================
- Hits 70755 18324 -52431
+ Misses 78501 7842 -70659
+ Partials 1802 1474 -328
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@router/pkg/pubsub/datasource/subscription_datasource.go`:
- Around line 97-121: The SubscriptionBeforeTrigger method has unnamed return
values, which prevents the deferred panic recovery from properly propagating
errors to the caller. When the defer block catches a panic and assigns the error
to the local err variable, the function still returns the original unnamed
return values instead of the recovered error. Change the method signature to use
named return values (output for the byte slice and err for the error) so that
when the defer block sets the err variable in the panic recovery handler, it
updates the actual return values that will be returned to the caller.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 55ab7977-4282-4aed-b95d-416dfc61f88f
📒 Files selected for processing (6)
router/core/factoryresolver.gorouter/core/router.gorouter/core/router_config.gorouter/core/subscriptions_modules.gorouter/pkg/pubsub/datasource/hooks.gorouter/pkg/pubsub/datasource/subscription_datasource.go
❌ Internal Query Planner CI checks failedThe Internal Query Planner CI checks failed in the celestial repository, and this is going to stop the merge of this PR. |
There was a problem hiding this comment.
♻️ Duplicate comments (1)
router/pkg/pubsub/datasource/subscription_datasource.go (1)
97-121:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winPanic recovery does not propagate errors to callers.
SubscriptionBeforeCreaterecovers panics, but with unnamed returns the recovered error isn’t returned; panic paths can exit as(nil, nil).🔧 Suggested fix
-func (s *PubSubSubscriptionDataSource[C]) SubscriptionBeforeCreate(ctx context.Context, input []byte) ([]byte, error) { +func (s *PubSubSubscriptionDataSource[C]) SubscriptionBeforeCreate(ctx context.Context, input []byte) (out []byte, err error) { + out = input if len(s.hooks.SubscriptionBeforeCreate.Handlers) == 0 { - return input, nil + return out, nil } var ( conf SubscriptionEventConfiguration - err error ) @@ switch v := r.(type) { case error: err = v default: err = fmt.Errorf("%v", v) } + out = nil } }() @@ - return mergeConfigIntoInput(input, conf) + out, err = mergeConfigIntoInput(input, conf) + return out, err }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@router/pkg/pubsub/datasource/subscription_datasource.go` around lines 97 - 121, The SubscriptionBeforeCreate method uses unnamed return values but the defer recovery block assigns to an err variable that is never returned to callers. Change the function signature to use named return values so the err variable assignment in the panic recovery defer block properly corresponds to the returned error. This ensures that errors caught from panics are actually propagated back to the caller instead of returning (nil, nil) when a panic occurs.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@router/pkg/pubsub/datasource/subscription_datasource.go`:
- Around line 97-121: The SubscriptionBeforeCreate method uses unnamed return
values but the defer recovery block assigns to an err variable that is never
returned to callers. Change the function signature to use named return values so
the err variable assignment in the panic recovery defer block properly
corresponds to the returned error. This ensures that errors caught from panics
are actually propagated back to the caller instead of returning (nil, nil) when
a panic occurs.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5ddc91f3-6cd5-4575-9135-c1ea31d73cd4
⛔ Files ignored due to path filters (1)
router/go.sumis excluded by!**/*.sum
📒 Files selected for processing (6)
router/core/factoryresolver.gorouter/core/router.gorouter/core/router_config.gorouter/core/subscriptions_modules.gorouter/pkg/pubsub/datasource/hooks.gorouter/pkg/pubsub/datasource/subscription_datasource.go
🚧 Files skipped from review as they are similar to previous changes (2)
- router/core/factoryresolver.go
- router/core/subscriptions_modules.go
c1818c6 to
a4e5564
Compare
If a hook user can't create a new config, i.e. due to missing headers etc., let a hook user be able to return an error and stop the subscription from being created.
|
@coderabbitai review |
✅ Action performedReview finished.
|
There was a problem hiding this comment.
🧹 Nitpick comments (4)
router/pkg/pubsub/datasource/subscription_datasource_test.go (1)
703-710: ⚡ Quick winStrengthen replacement-semantics assertion in
SetSubscriptionOnCreateFnstest.The test currently asserts only length (
1), which can still pass even ifSetHooksfails to replace the previous handler instance. Assert identity/behavior of the active handler after the secondSetHookscall.Proposed test hardening
dataSource.SetHooks(Hooks{ SubscriptionOnCreate: SubscriptionOnCreateHooks{ Handlers: []SubscriptionOnCreateFn{hook2}, }, }) assert.Len(t, dataSource.hooks.SubscriptionOnCreate.Handlers, 1) + assert.Same(t, hook2, dataSource.hooks.SubscriptionOnCreate.Handlers[0])🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@router/pkg/pubsub/datasource/subscription_datasource_test.go` around lines 703 - 710, The test verifies the length of handlers remains 1 after calling SetHooks with a new hook (hook2), but it does not verify that the handler was actually replaced. After the second assert.Len call in the SetHooks block, add an additional assertion to verify that the handler at index 0 in dataSource.hooks.SubscriptionOnCreate.Handlers is actually hook2 (the new handler that should have replaced the previous one). This ensures the replacement semantics are correct and not just the count.router-tests/modules/subscription_on_create_test.go (3)
690-690: 💤 Low valueRemove unnecessary blank assignment.
The constant
sharedEmployeeIDdefined at line 597 is only used in the query string at line 598. The blank assignment_ = sharedEmployeeIDat line 690 serves no purpose and can be removed to improve code clarity.♻️ Suggested fix
wg.Wait() xEnv.WaitForSubscriptionCount(0, subscriptionOnCreateTestTimeout) - - _ = sharedEmployeeID })🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@router-tests/modules/subscription_on_create_test.go` at line 690, Remove the unnecessary blank assignment statement `_ = sharedEmployeeID` at line 690 in the subscription_on_create_test.go file. Since the sharedEmployeeID constant is already used in the query string earlier in the test and the blank assignment serves no purpose, simply delete this line to improve code clarity.
463-519: ⚡ Quick winReplace manual WaitGroup pattern with
sync.WaitGroup.Go()across fan-in and fan-out tests. Based on learnings, Go 1.25+ code in this repository should usewg.Go(func())instead of manualwg.Add(n)followed bygo func() { defer wg.Done(); ... }(), as the former manages Add and Done automatically.
router-tests/modules/subscription_on_create_test.go#L463-L519: Replacevar wg sync.WaitGroup; wg.Add(2)withvar wg sync.WaitGroup, then replace bothgo func() { defer wg.Done(); ... }()blocks withwg.Go(func() { ... }).router-tests/modules/subscription_on_create_test.go#L591-L642: Apply the same transformation: removewg.Add(2)and replace bothgo func() { defer wg.Done(); ... }()blocks withwg.Go(func() { ... }).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@router-tests/modules/subscription_on_create_test.go` around lines 463 - 519, The code uses the manual WaitGroup pattern with explicit wg.Add() and defer wg.Done() calls instead of the modern Go 1.25+ wg.Go() method. At router-tests/modules/subscription_on_create_test.go lines 463-519, remove the wg.Add(2) call and replace the two go func() { defer wg.Done(); ... }() blocks with wg.Go(func() { ... }) calls, ensuring the function bodies remain unchanged. Apply the identical transformation at router-tests/modules/subscription_on_create_test.go lines 591-642, removing wg.Add(2) and replacing both go func() { defer wg.Done(); ... }() blocks with wg.Go(func() { ... }). The wg.Go() method automatically manages Add and Done calls, eliminating boilerplate.Source: Learnings
491-492: ⚡ Quick winRemove manual
SetReadDeadlinebeforetestenv.WSReadJSONin fan-in and fan-out tests. The coding guideline specifies that manualSetReadDeadlinewith rawconn.ReadJSONshould only be used when expecting errors;testenv.WSReadJSONalready handles deadlines with retry logic, so the manual deadline is redundant.
router-tests/modules/subscription_on_create_test.go#L491-L492: Removeconn.SetReadDeadline(time.Now().Add(time.Second * 5))beforetestenv.WSReadJSON(t, conn, &complete).router-tests/modules/subscription_on_create_test.go#L517-L518: Removeconn.SetReadDeadline(time.Now().Add(time.Second * 5))beforetestenv.WSReadJSON(t, conn, &complete).router-tests/modules/subscription_on_create_test.go#L617-L618: Removeconn.SetReadDeadline(time.Now().Add(time.Second * 5))beforetestenv.WSReadJSON(t, conn, &complete).router-tests/modules/subscription_on_create_test.go#L640-L641: Removeconn.SetReadDeadline(time.Now().Add(time.Second * 5))beforetestenv.WSReadJSON(t, conn, &complete).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@router-tests/modules/subscription_on_create_test.go` around lines 491 - 492, Remove the redundant manual `SetReadDeadline` calls in router-tests/modules/subscription_on_create_test.go at four locations before `testenv.WSReadJSON` calls, since testenv.WSReadJSON already handles deadlines with retry logic. At lines 491-492 (anchor site), remove the `conn.SetReadDeadline(time.Now().Add(time.Second * 5))` line immediately before the `testenv.WSReadJSON(t, conn, &complete)` call. Apply the same removal at lines 517-518 (sibling site) before another `testenv.WSReadJSON(t, conn, &complete)` call, at lines 617-618 (sibling site) before the same pattern, and at lines 640-641 (sibling site) before the same pattern. In each case, simply delete the SetReadDeadline line while keeping the testenv.WSReadJSON call.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@router-tests/modules/subscription_on_create_test.go`:
- Line 690: Remove the unnecessary blank assignment statement `_ =
sharedEmployeeID` at line 690 in the subscription_on_create_test.go file. Since
the sharedEmployeeID constant is already used in the query string earlier in the
test and the blank assignment serves no purpose, simply delete this line to
improve code clarity.
- Around line 463-519: The code uses the manual WaitGroup pattern with explicit
wg.Add() and defer wg.Done() calls instead of the modern Go 1.25+ wg.Go()
method. At router-tests/modules/subscription_on_create_test.go lines 463-519,
remove the wg.Add(2) call and replace the two go func() { defer wg.Done(); ...
}() blocks with wg.Go(func() { ... }) calls, ensuring the function bodies remain
unchanged. Apply the identical transformation at
router-tests/modules/subscription_on_create_test.go lines 591-642, removing
wg.Add(2) and replacing both go func() { defer wg.Done(); ... }() blocks with
wg.Go(func() { ... }). The wg.Go() method automatically manages Add and Done
calls, eliminating boilerplate.
- Around line 491-492: Remove the redundant manual `SetReadDeadline` calls in
router-tests/modules/subscription_on_create_test.go at four locations before
`testenv.WSReadJSON` calls, since testenv.WSReadJSON already handles deadlines
with retry logic. At lines 491-492 (anchor site), remove the
`conn.SetReadDeadline(time.Now().Add(time.Second * 5))` line immediately before
the `testenv.WSReadJSON(t, conn, &complete)` call. Apply the same removal at
lines 517-518 (sibling site) before another `testenv.WSReadJSON(t, conn,
&complete)` call, at lines 617-618 (sibling site) before the same pattern, and
at lines 640-641 (sibling site) before the same pattern. In each case, simply
delete the SetReadDeadline line while keeping the testenv.WSReadJSON call.
In `@router/pkg/pubsub/datasource/subscription_datasource_test.go`:
- Around line 703-710: The test verifies the length of handlers remains 1 after
calling SetHooks with a new hook (hook2), but it does not verify that the
handler was actually replaced. After the second assert.Len call in the SetHooks
block, add an additional assertion to verify that the handler at index 0 in
dataSource.hooks.SubscriptionOnCreate.Handlers is actually hook2 (the new
handler that should have replaced the previous one). This ensures the
replacement semantics are correct and not just the count.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 03869d14-b864-459a-9604-e2d6a25558c3
⛔ Files ignored due to path filters (2)
router-tests/go.sumis excluded by!**/*.sumrouter/go.sumis excluded by!**/*.sum
📒 Files selected for processing (14)
docs-website/docs.jsondocs-website/router/cosmo-streams/custom-modules.mdxdocs-website/router/cosmo-streams/custom-modules/subscription-on-create.mdxrouter-tests/go.modrouter-tests/modules/subscription-on-create/module.gorouter-tests/modules/subscription_on_create_test.gorouter/core/factoryresolver.gorouter/core/router.gorouter/core/router_config.gorouter/core/subscriptions_modules.gorouter/go.modrouter/pkg/pubsub/datasource/hooks.gorouter/pkg/pubsub/datasource/subscription_datasource.gorouter/pkg/pubsub/datasource/subscription_datasource_test.go
✅ Files skipped from review due to trivial changes (1)
- docs-website/router/cosmo-streams/custom-modules.mdx
🚧 Files skipped from review as they are similar to previous changes (1)
- router-tests/go.mod
Corresponding change in engine: wundergraph/graphql-go-tools#1538
Reviewer notes
Good review starting point is here . This is where the engine calls the new hook.
The type definition for the Custom Module Handler is here.
In general this changes:
resolve.HookablePubsubDatasourceby adding the methodSubscriptionOnCreate. This method executes the proper custom modules registered by the user.General context
Adds a new Cosmo Streams Custom Module
SubscriptionOnCreate.It allows to modify the subscription event configuration. Usecases for example are overwriting the Redis channels / Kafka topics / Nats subjects programmatically.
It is very similar to the existing
SubscriptionOnStartbut there are some differences:SubscriptionOnStartruns afterMinimal usage example
I marked this new handler as experimental to be able to change the handler in the future to be able to adapt to feedback from users.
Summary by CodeRabbit
SubscriptionOnCreateHandler.Checklist
Open Source AI Manifesto
This project follows the principles of the Open Source AI Manifesto. Please ensure your contribution aligns with its principles.