Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.

## Unreleased

### Added

- mongodb_cdc: Input now adds `schema` metadata to consumed messages. Schema is extracted from the collection's `$jsonSchema` validator when available, otherwise inferred from document structure. This can be used for automatic schema conversion in processors such as `parquet_encode`. (@Jeffail)

### Changed

- microsoft_sql_server_cdc: The `schema` metadata field (containing the SQL schema name of the source table) has been renamed to `database_schema`. The `common_schema` metadata field (containing the benthos common schema) has been renamed to `schema` for consistency with the `mysql_cdc` and `postgres_cdc` inputs. (@Jeffail)
Expand Down
20 changes: 18 additions & 2 deletions docs/modules/components/pages/inputs/mongodb_cdc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,27 @@ By default MongoDB does not propagate changes in all cases. In order to capture

== Metadata

Each message omitted by this plugin has the following metadata:
Each message emitted by this plugin has the following metadata:

- operation: either "create", "replace", "delete" or "update" for changes streamed. Documents from the initial snapshot have the operation set to "read".
- operation: either "insert", "replace", "delete" or "update" for changes streamed. Documents from the initial snapshot have the operation set to "read".
- collection: the collection the document was written to.
- operation_time: the oplog time for when this operation occurred.
- schema: the collection schema in benthos common schema format (set as immutable metadata). Extracted from the collection's `$jsonSchema` validator if available, otherwise inferred from the first document seen. Not present on messages where no schema could be determined (e.g. deletes without pre-images when no prior schema is cached).

== Schema Detection

Schema metadata is discovered using a two-tier strategy:

1. *$jsonSchema validators* are preferred and queried at startup for each watched collection. When a validator exists, the schema provides accurate type information and required/optional field classification.
2. When no validator exists, schema is *inferred from the first document* received per collection. All fields are marked optional.

*Change detection:* when a document's top-level field set differs from the cached schema, the schema is re-inferred from that document. This applies to both validator-sourced and inference-sourced schemas.

*Limitations:* type changes within existing fields and structural changes inside nested subdocuments are not detected automatically. Restart the input to force a full schema refresh.

*Fields with null values, unknown BSON types, or mixed-type arrays* are mapped to the `Any` schema type. The `parquet_encode` processor does not support `Any` and will error if it encounters one. Add an upstream processor (e.g. `mapping`) to convert or remove these fields before `parquet_encode`.

*Schema stability:* MongoDB collections may contain documents with varying field sets. When this occurs, the schema updates on each structural change, which can cause frequent schema version bumps in schema registries with compatibility modes. For schema registry targets, configuring a `$jsonSchema` validator on the collection is strongly recommended.


== Fields
Expand Down
112 changes: 104 additions & 8 deletions internal/impl/mongodb/cdc/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,27 @@ By default MongoDB does not propagate changes in all cases. In order to capture

== Metadata

Each message omitted by this plugin has the following metadata:
Each message emitted by this plugin has the following metadata:

- operation: either "create", "replace", "delete" or "update" for changes streamed. Documents from the initial snapshot have the operation set to "read".
- operation: either "insert", "replace", "delete" or "update" for changes streamed. Documents from the initial snapshot have the operation set to "read".
- collection: the collection the document was written to.
- operation_time: the oplog time for when this operation occurred.
- schema: the collection schema in benthos common schema format (set as immutable metadata). Extracted from the collection's `+"`$jsonSchema`"+` validator if available, otherwise inferred from the first document seen. Not present on messages where no schema could be determined (e.g. deletes without pre-images when no prior schema is cached).

== Schema Detection

Schema metadata is discovered using a two-tier strategy:

1. *$jsonSchema validators* are preferred and queried at startup for each watched collection. When a validator exists, the schema provides accurate type information and required/optional field classification.
2. When no validator exists, schema is *inferred from the first document* received per collection. All fields are marked optional.

*Change detection:* when a document's top-level field set differs from the cached schema, the schema is re-inferred from that document. This applies to both validator-sourced and inference-sourced schemas.

*Limitations:* type changes within existing fields and structural changes inside nested subdocuments are not detected automatically. Restart the input to force a full schema refresh.

*Fields with null values, unknown BSON types, or mixed-type arrays* are mapped to the `+"`Any`"+` schema type. The `+"`parquet_encode`"+` processor does not support `+"`Any`"+` and will error if it encounters one. Add an upstream processor (e.g. `+"`mapping`"+`) to convert or remove these fields before `+"`parquet_encode`"+`.

*Schema stability:* MongoDB collections may contain documents with varying field sets. When this occurs, the schema updates on each structural change, which can cause frequent schema version bumps in schema registries with compatibility modes. For schema registry targets, configuring a `+"`$jsonSchema`"+` validator on the collection is strongly recommended.
`).
Fields(
service.NewStringField(fieldClientURL).
Expand Down Expand Up @@ -167,9 +183,10 @@ func newMongoCDC(conf *service.ParsedConfig, res *service.Resources) (i service.
return nil, err
}
cdc := &mongoCDC{
readChan: make(chan mongoBatch),
errorChan: make(chan error, 1),
logger: res.Logger(),
readChan: make(chan mongoBatch),
errorChan: make(chan error, 1),
logger: res.Logger(),
collectionSchemas: make(map[string]*cachedSchema),
}
var url, username, password, dbName, appName string
if url, err = conf.FieldString(fieldClientURL); err != nil {
Expand Down Expand Up @@ -340,6 +357,14 @@ type mongoCDC struct {

resumeToken bson.Raw
resumeTokenMu sync.Mutex

collectionSchemas map[string]*cachedSchema
collectionSchemasMu sync.RWMutex
}

type cachedSchema struct {
schema any // serialised Common Schema (from ToAny())
keys []string // sorted top-level field names for key-set fingerprinting
}

func (m *mongoCDC) Connect(ctx context.Context) error {
Expand All @@ -357,6 +382,11 @@ func (m *mongoCDC) Connect(ctx context.Context) error {
default:
}
}
// Reset schema cache on reconnect so stale schemas from a previous
// connection don't persist if collections were changed in between.
m.collectionSchemasMu.Lock()
m.collectionSchemas = make(map[string]*cachedSchema)
m.collectionSchemasMu.Unlock()
if err := m.client.Ping(ctx, nil); err != nil {
return fmt.Errorf("unable to ping mongodb: %w", err)
}
Expand Down Expand Up @@ -405,6 +435,21 @@ func (m *mongoCDC) Connect(ctx context.Context) error {
if !ok {
return fmt.Errorf("unable to get oplog last commit timestamp, got %s", helloReply.String())
}
// Tier 1: pre-fetch $jsonSchema validators for all watched collections
// during Connect() so the stream goroutine is not delayed.
for _, coll := range m.collections {
s, keys, err := fetchCollectionSchema(ctx, m.db, coll)
if err != nil {
m.logger.Warnf("Failed to fetch $jsonSchema for collection %s: %v", coll, err)
continue
}
if s != nil {
m.collectionSchemasMu.Lock()
m.collectionSchemas[coll] = &cachedSchema{schema: s, keys: keys}
m.collectionSchemasMu.Unlock()
}
}

shutsig := shutdown.NewSignaller()
m.shutsig = shutsig
go func() {
Expand All @@ -415,6 +460,7 @@ func (m *mongoCDC) Connect(ctx context.Context) error {
}
defer cancel()
defer shutsig.TriggerHasStopped()

opts := options.ChangeStream().
SetBatchSize(int32(m.readBatchSize)).
SetMaxAwaitTime(m.streamMaxWait)
Expand Down Expand Up @@ -649,7 +695,7 @@ func (m *mongoCDC) readSnapshotRange(
if err := cursor.Decode(&doc); err != nil {
return fmt.Errorf("unable to decode document: %w", err)
}
msg, err := m.newMongoDBCDCMessage(doc, "read", coll.Name(), snapshotTime)
msg, err := m.newMongoDBCDCMessage(doc, "read", coll.Name(), snapshotTime, false)
if err != nil {
return fmt.Errorf("unable to create message from document: %w", err)
}
Expand Down Expand Up @@ -749,6 +795,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso
return fmt.Errorf("unable to extract operation type from change string, got: %s", data)
}
var doc any
var keyOnly bool // true when doc is documentKey-only or synthetic partial update
switch opType {
case "update":
if m.docMode == documentModePartialUpdate {
Expand Down Expand Up @@ -819,6 +866,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso
}
key["operations"] = ops
doc = key
keyOnly = true // synthetic structure, don't infer schema from it
break
}
fallthrough
Expand All @@ -833,6 +881,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso
if doc == nil {
// this is when pre images are not available
doc = data["documentKey"]
keyOnly = true
}
case "invalidate":
return errors.New("watch stream invalidated")
Expand All @@ -848,7 +897,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso
if !ok {
return fmt.Errorf("unable to extract optime from change stream, got: %T", data["clusterTime"])
}
msg, err := m.newMongoDBCDCMessage(doc, opType, coll, optime)
msg, err := m.newMongoDBCDCMessage(doc, opType, coll, optime, keyOnly)
if err != nil {
return fmt.Errorf("unable to create message from change stream event: %w", err)
}
Expand Down Expand Up @@ -884,7 +933,12 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso
return stream.Err()
}

func (m *mongoCDC) newMongoDBCDCMessage(doc any, operationType, collectionName string, opTime bson.Timestamp) (msg *service.Message, err error) {
// newMongoDBCDCMessage creates a service.Message from a BSON document with
// appropriate metadata. When keyOnly is true the document represents only a
// documentKey (e.g. a delete without pre-images) or a synthetic partial-update
// structure — schema inference is skipped and only the cached schema (if any)
// is attached.
func (m *mongoCDC) newMongoDBCDCMessage(doc any, operationType, collectionName string, opTime bson.Timestamp, keyOnly bool) (msg *service.Message, err error) {
var b []byte
if doc != nil {
b, err = bson.MarshalExtJSON(doc, m.marshalCanonical, false)
Expand All @@ -904,9 +958,51 @@ func (m *mongoCDC) newMongoDBCDCMessage(doc any, operationType, collectionName s
// This is the JSON format for a timestamp, but the normalize serialization stuff doesn't support writing
// one at the top level.
msg.MetaSetMut("operation_time", fmt.Sprintf(`{"$timestamp":{"t":%d,"i":%d}}`, opTime.T, opTime.I))

// Attach schema metadata.
if docM, ok := doc.(bson.M); ok {
var s any
if keyOnly {
s = m.getCachedSchema(collectionName)
} else {
s = m.getOrInferCollectionSchema(collectionName, docM)
}
if s != nil {
msg.MetaSetImmut("schema", service.ImmutableAny{V: s})
}
}
return msg, nil
}

// getOrInferCollectionSchema returns the cached schema if the document's key
// set matches, or infers a new schema and updates the cache.
func (m *mongoCDC) getOrInferCollectionSchema(collectionName string, doc bson.M) any {
docKeys := sortedMapKeys(doc)

m.collectionSchemasMu.Lock()
defer m.collectionSchemasMu.Unlock()

if cached, ok := m.collectionSchemas[collectionName]; ok && slices.Equal(cached.keys, docKeys) {
return cached.schema
}

// Cache miss or key-set mismatch — (re-)infer.
s, keys := inferSchemaFromDocument(collectionName, doc)
m.collectionSchemas[collectionName] = &cachedSchema{schema: s, keys: keys}
return s
}

// getCachedSchema returns the cached schema for a collection without inference.
// Used for keyOnly documents (deletes with documentKey, partial updates).
func (m *mongoCDC) getCachedSchema(collectionName string) any {
m.collectionSchemasMu.RLock()
defer m.collectionSchemasMu.RUnlock()
if cached, ok := m.collectionSchemas[collectionName]; ok {
return cached.schema
}
return nil
}

func (m *mongoCDC) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) {
select {
case mb := <-m.readChan:
Expand Down
Loading
Loading