Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0ff6ef6
feat: add SubscriptionBeforeTrigger hook
dkorittki Jun 15, 2026
ee0b84b
fix: readd json string delimiters when merging input
dkorittki Jun 15, 2026
35aad3a
chore: remove error return option from hook
dkorittki Jun 15, 2026
04141bf
chore: remove unnecessary set function
dkorittki Jun 15, 2026
6a7890e
chore: mention experimental methods in godoc
dkorittki Jun 15, 2026
cf36f7d
chore: update go.mod to temporary engine version
dkorittki Jun 15, 2026
73a24cf
fix: add missing go.sum engine entry
dkorittki Jun 16, 2026
a4e5564
chore: rename hook
dkorittki Jun 16, 2026
56d1da4
chore: remove unnecessary interface guard
dkorittki Jun 16, 2026
c3f2be7
chore: update go.mod
dkorittki Jun 16, 2026
544e0f8
fix: use named returns
dkorittki Jun 16, 2026
a589e28
chore: fix go.mod
dkorittki Jun 16, 2026
00dd6f7
chore: add unit tests
dkorittki Jun 16, 2026
7cf5540
chore: add docs
dkorittki Jun 16, 2026
f926ed1
chore: add integration tests
dkorittki Jun 16, 2026
438a6c5
chore: reduce duplicate code in test file
dkorittki Jun 16, 2026
f41b257
feat: let hook return errors
dkorittki Jun 16, 2026
f600b48
Merge branch 'main' into dominik/new_subscriptionOnTrigger_hook
dkorittki Jun 16, 2026
5c0e16f
Merge branch 'main' into dominik/new_subscriptionOnTrigger_hook
dkorittki Jul 1, 2026
f0b4ab0
Merge branch 'main' into dominik/new_subscriptionOnTrigger_hook
dkorittki Jul 2, 2026
73b3756
chore: update go.mod to latest graphql-go-tools
dkorittki Jul 2, 2026
3d24f25
Merge branch 'main' into dominik/new_subscriptionOnTrigger_hook
dkorittki Jul 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs-website/docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@
"icon": "cubes",
"pages": [
"router/cosmo-streams/custom-modules",
"router/cosmo-streams/custom-modules/subscription-on-create",
"router/cosmo-streams/custom-modules/subscription-on-start",
"router/cosmo-streams/custom-modules/on-receive-event",
"router/cosmo-streams/custom-modules/on-publish-event"
Expand Down
1 change: 1 addition & 0 deletions docs-website/router/cosmo-streams/custom-modules.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Custom Modules in Cosmo Streams are available since Router version 0.266.0

The Cosmo Streams system provides three main hook interfaces that you can implement with [Custom Modules](/router/custom-modules):

- [`SubscriptionOnCreateHandler`](/router/cosmo-streams/custom-modules/subscription-on-create): Called before a subscription is registered, allows modifying the event configuration (experimental)
- [`SubscriptionOnStartHandler`](/router/cosmo-streams/custom-modules/subscription-on-start): Called when a client subscribes
- [`OnReceiveEventHandler`](/router/cosmo-streams/custom-modules/on-receive-event): Called when events are received from a message broker
- [`OnPublishEventHandler`](/router/cosmo-streams/custom-modules/on-publish-event): Called when events are going to be sent to a message broker
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
---
title: "SubscriptionOnCreate Handler"
description: "A Cosmo Streams Custom Module that lets you modify the subscription event configuration before the subscription is registered"
icon: "circle-plus"
---

<Warning>
`SubscriptionOnCreate` is an experimental hook. Its signature and behaviour may change without prior notice.
</Warning>

The `SubscriptionOnCreate` handler is a custom module hook that runs once per subscription, before the subscription is registered with the message broker. It gives you access to the subscription event configuration and lets you modify it in place.

This handler is useful for:
- **Dynamic subject routing**: Override NATS subjects, Redis channels or Kafka topics based on custom logic
- **Per-request configuration**: Override provider-level defaults for individual subscriptions

## Limitations

Compared to [`SubscriptionOnStart`](/router/cosmo-streams/custom-modules/subscription-on-start), this handler:
- Cannot emit events to the client
- Only applies to Cosmo Streams / EDFS subscriptions, not to regular GraphQL subscriptions

## Handler Interface

Implement `SubscriptionOnCreateHandler` in a [Custom Module](/router/custom-modules) to use this hook.

```go
type SubscriptionOnCreateHandler interface {
// SubscriptionOnCreate is called once before the subscription is registered.
// Mutations to the SubscriptionEventConfiguration take effect immediately.
// Returning a non-nil error aborts the subscription.
//
// This method is currently EXPERIMENTAL.
// The signature and behaviour might change without prior notice.
SubscriptionOnCreate(ctx SubscriptionOnCreateHandlerContext) error
}

type SubscriptionOnCreateHandlerContext interface {
// Request is the original request received by the router.
Request() *http.Request
// Logger is the logger for the request.
Logger() *zap.Logger
// Operation is the GraphQL operation.
Operation() OperationContext
// Authentication is the authentication for the request.
Authentication() authentication.Authentication
// SubscriptionEventConfiguration returns the current subscription event configuration.
// The returned value is a pointer; mutating it directly takes effect without any additional call.
//
// This method is currently EXPERIMENTAL.
// The signature and behaviour might change without prior notice.
SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration
}
```

## Modifying the Configuration

`SubscriptionEventConfiguration` is an interface. To access provider-specific fields you must type-assert to the concrete configuration type for your message broker.

For NATS subscriptions:

```go
import natspubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"

func (m *MyModule) SubscriptionOnCreate(ctx core.SubscriptionOnCreateHandlerContext) error {
cfg, ok := ctx.SubscriptionEventConfiguration().(*natspubsub.SubscriptionEventConfiguration)
if !ok {
return nil
}
// cfg.Subjects and cfg.StreamConfiguration are now directly mutable.
return nil
}
```

For Redis subscriptions:

```go
import redispubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/redis"

func (m *MyModule) SubscriptionOnCreate(ctx core.SubscriptionOnCreateHandlerContext) error {
cfg, ok := ctx.SubscriptionEventConfiguration().(*redispubsub.SubscriptionEventConfiguration)
if !ok {
return nil
}
// cfg fields are now directly mutable.
return nil
}
```

Use `ctx.SubscriptionEventConfiguration().ProviderType()` to branch by provider type when your module handles multiple providers.

## Usage Example

### Dynamic NATS subject routing

The following example rewrites NATS subjects based on a tenant identifier from the request header. Each tenant is isolated to its own subject namespace.

```go
package module

import (
"fmt"

"github.com/wundergraph/cosmo/router/core"
natspubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"
)

func init() {
core.RegisterModule(&TenantRoutingModule{})
}

const ModuleID = "tenantRoutingModule"

type TenantRoutingModule struct{}

func (m *TenantRoutingModule) SubscriptionOnCreate(ctx core.SubscriptionOnCreateHandlerContext) error {
// Only rewrite subjects for the "employeeUpdated" subscription.
if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" {
return nil
}

cfg, ok := ctx.SubscriptionEventConfiguration().(*natspubsub.SubscriptionEventConfiguration)
if !ok {
return nil
}

tenantID := ctx.Request().Header.Get("X-Tenant-ID")
if tenantID == "" {
return nil
}

// Prefix every subject with the tenant ID.
for i, subject := range cfg.Subjects {
cfg.Subjects[i] = fmt.Sprintf("%s.%s", tenantID, subject)
}
return nil
}

func (m *TenantRoutingModule) Module() core.ModuleInfo {
return core.ModuleInfo{
ID: ModuleID,
New: func() core.Module {
return &TenantRoutingModule{}
},
}
}

var _ core.SubscriptionOnCreateHandler = (*TenantRoutingModule)(nil)
```

### Dynamic Redis channel routing

The following example rewrites Redis channels based on a user ID extracted from the authenticated request. Each user receives events only from their own channel.

```go
package module

import (
"fmt"

"github.com/wundergraph/cosmo/router/core"
redispubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/redis"
)

func init() {
core.RegisterModule(&UserChannelRoutingModule{})
}

const ModuleID = "userChannelRoutingModule"

type UserChannelRoutingModule struct{}

func (m *UserChannelRoutingModule) SubscriptionOnCreate(ctx core.SubscriptionOnCreateHandlerContext) error {
// Only rewrite channels for the "orderStatusUpdated" subscription.
if ctx.SubscriptionEventConfiguration().RootFieldName() != "orderStatusUpdated" {
return nil
}

cfg, ok := ctx.SubscriptionEventConfiguration().(*redispubsub.SubscriptionEventConfiguration)
if !ok {
return nil
}

auth := ctx.Authentication()
if auth == nil {
return nil
}

userID, ok := auth.Claims()["sub"].(string)
if !ok || userID == "" {
return nil
}

// Route each user to their own channel.
for i, channel := range cfg.Channels {
cfg.Channels[i] = fmt.Sprintf("%s.%s", channel, userID)
}
return nil
}

func (m *UserChannelRoutingModule) Module() core.ModuleInfo {
return core.ModuleInfo{
ID: ModuleID,
New: func() core.Module {
return &UserChannelRoutingModule{}
},
}
}

var _ core.SubscriptionOnCreateHandler = (*UserChannelRoutingModule)(nil)
```
44 changes: 44 additions & 0 deletions router-tests/modules/subscription-on-create/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package subscription_on_create

import (
"sync/atomic"

"go.uber.org/zap"

"github.com/wundergraph/cosmo/router/core"
)

const myModuleID = "subscriptionOnCreateModule"

type SubscriptionOnCreateModule struct {
Logger *zap.Logger
Callback func(ctx core.SubscriptionOnCreateHandlerContext) error
HookCallCount *atomic.Int32
}

func (m *SubscriptionOnCreateModule) Provision(ctx *core.ModuleContext) error {
m.Logger = ctx.Logger
return nil
}

func (m *SubscriptionOnCreateModule) SubscriptionOnCreate(ctx core.SubscriptionOnCreateHandlerContext) error {
if m.HookCallCount != nil {
m.HookCallCount.Add(1)
}
if m.Callback != nil {
return m.Callback(ctx)
}
return nil
}

func (m *SubscriptionOnCreateModule) Module() core.ModuleInfo {
return core.ModuleInfo{
ID: myModuleID,
Priority: 1,
New: func() core.Module {
return &SubscriptionOnCreateModule{}
},
}
}

var _ core.SubscriptionOnCreateHandler = (*SubscriptionOnCreateModule)(nil)
Loading
Loading