diff --git a/CHANGELOG.md b/CHANGELOG.md index 923f8153c8..c5f0b19c3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. ## Unreleased +### Added + +- mongodb_cdc: Input now adds `schema` metadata to consumed messages. Schema is extracted from the collection's `$jsonSchema` validator when available, otherwise inferred from document structure. This can be used for automatic schema conversion in processors such as `parquet_encode`. (@Jeffail) + ### Changed - microsoft_sql_server_cdc: The `schema` metadata field (containing the SQL schema name of the source table) has been renamed to `database_schema`. The `common_schema` metadata field (containing the benthos common schema) has been renamed to `schema` for consistency with the `mysql_cdc` and `postgres_cdc` inputs. (@Jeffail) diff --git a/docs/modules/components/pages/inputs/mongodb_cdc.adoc b/docs/modules/components/pages/inputs/mongodb_cdc.adoc index 15911b5d1e..aec8d96660 100644 --- a/docs/modules/components/pages/inputs/mongodb_cdc.adoc +++ b/docs/modules/components/pages/inputs/mongodb_cdc.adoc @@ -91,11 +91,27 @@ By default MongoDB does not propagate changes in all cases. In order to capture == Metadata -Each message omitted by this plugin has the following metadata: +Each message emitted by this plugin has the following metadata: -- operation: either "create", "replace", "delete" or "update" for changes streamed. Documents from the initial snapshot have the operation set to "read". +- operation: either "insert", "replace", "delete" or "update" for changes streamed. Documents from the initial snapshot have the operation set to "read". - collection: the collection the document was written to. - operation_time: the oplog time for when this operation occurred. +- schema: the collection schema in benthos common schema format (set as immutable metadata). Extracted from the collection's `$jsonSchema` validator if available, otherwise inferred from the first document seen. Not present on messages where no schema could be determined (e.g. deletes without pre-images when no prior schema is cached). + +== Schema Detection + +Schema metadata is discovered using a two-tier strategy: + +1. *$jsonSchema validators* are preferred and queried at startup for each watched collection. When a validator exists, the schema provides accurate type information and required/optional field classification. +2. When no validator exists, schema is *inferred from the first document* received per collection. All fields are marked optional. + +*Change detection:* when a document's top-level field set differs from the cached schema, the schema is re-inferred from that document. This applies to both validator-sourced and inference-sourced schemas. + +*Limitations:* type changes within existing fields and structural changes inside nested subdocuments are not detected automatically. Restart the input to force a full schema refresh. + +*Fields with null values, unknown BSON types, or mixed-type arrays* are mapped to the `Any` schema type. The `parquet_encode` processor does not support `Any` and will error if it encounters one. Add an upstream processor (e.g. `mapping`) to convert or remove these fields before `parquet_encode`. + +*Schema stability:* MongoDB collections may contain documents with varying field sets. When this occurs, the schema updates on each structural change, which can cause frequent schema version bumps in schema registries with compatibility modes. For schema registry targets, configuring a `$jsonSchema` validator on the collection is strongly recommended. == Fields diff --git a/internal/impl/mongodb/cdc/input.go b/internal/impl/mongodb/cdc/input.go index 91133f5f27..0b3f6fb38e 100644 --- a/internal/impl/mongodb/cdc/input.go +++ b/internal/impl/mongodb/cdc/input.go @@ -67,11 +67,27 @@ By default MongoDB does not propagate changes in all cases. In order to capture == Metadata -Each message omitted by this plugin has the following metadata: +Each message emitted by this plugin has the following metadata: -- operation: either "create", "replace", "delete" or "update" for changes streamed. Documents from the initial snapshot have the operation set to "read". +- operation: either "insert", "replace", "delete" or "update" for changes streamed. Documents from the initial snapshot have the operation set to "read". - collection: the collection the document was written to. - operation_time: the oplog time for when this operation occurred. +- schema: the collection schema in benthos common schema format (set as immutable metadata). Extracted from the collection's `+"`$jsonSchema`"+` validator if available, otherwise inferred from the first document seen. Not present on messages where no schema could be determined (e.g. deletes without pre-images when no prior schema is cached). + +== Schema Detection + +Schema metadata is discovered using a two-tier strategy: + +1. *$jsonSchema validators* are preferred and queried at startup for each watched collection. When a validator exists, the schema provides accurate type information and required/optional field classification. +2. When no validator exists, schema is *inferred from the first document* received per collection. All fields are marked optional. + +*Change detection:* when a document's top-level field set differs from the cached schema, the schema is re-inferred from that document. This applies to both validator-sourced and inference-sourced schemas. + +*Limitations:* type changes within existing fields and structural changes inside nested subdocuments are not detected automatically. Restart the input to force a full schema refresh. + +*Fields with null values, unknown BSON types, or mixed-type arrays* are mapped to the `+"`Any`"+` schema type. The `+"`parquet_encode`"+` processor does not support `+"`Any`"+` and will error if it encounters one. Add an upstream processor (e.g. `+"`mapping`"+`) to convert or remove these fields before `+"`parquet_encode`"+`. + +*Schema stability:* MongoDB collections may contain documents with varying field sets. When this occurs, the schema updates on each structural change, which can cause frequent schema version bumps in schema registries with compatibility modes. For schema registry targets, configuring a `+"`$jsonSchema`"+` validator on the collection is strongly recommended. `). Fields( service.NewStringField(fieldClientURL). @@ -167,9 +183,10 @@ func newMongoCDC(conf *service.ParsedConfig, res *service.Resources) (i service. return nil, err } cdc := &mongoCDC{ - readChan: make(chan mongoBatch), - errorChan: make(chan error, 1), - logger: res.Logger(), + readChan: make(chan mongoBatch), + errorChan: make(chan error, 1), + logger: res.Logger(), + collectionSchemas: make(map[string]*cachedSchema), } var url, username, password, dbName, appName string if url, err = conf.FieldString(fieldClientURL); err != nil { @@ -340,6 +357,14 @@ type mongoCDC struct { resumeToken bson.Raw resumeTokenMu sync.Mutex + + collectionSchemas map[string]*cachedSchema + collectionSchemasMu sync.RWMutex +} + +type cachedSchema struct { + schema any // serialised Common Schema (from ToAny()) + keys []string // sorted top-level field names for key-set fingerprinting } func (m *mongoCDC) Connect(ctx context.Context) error { @@ -357,6 +382,11 @@ func (m *mongoCDC) Connect(ctx context.Context) error { default: } } + // Reset schema cache on reconnect so stale schemas from a previous + // connection don't persist if collections were changed in between. + m.collectionSchemasMu.Lock() + m.collectionSchemas = make(map[string]*cachedSchema) + m.collectionSchemasMu.Unlock() if err := m.client.Ping(ctx, nil); err != nil { return fmt.Errorf("unable to ping mongodb: %w", err) } @@ -405,6 +435,21 @@ func (m *mongoCDC) Connect(ctx context.Context) error { if !ok { return fmt.Errorf("unable to get oplog last commit timestamp, got %s", helloReply.String()) } + // Tier 1: pre-fetch $jsonSchema validators for all watched collections + // during Connect() so the stream goroutine is not delayed. + for _, coll := range m.collections { + s, keys, err := fetchCollectionSchema(ctx, m.db, coll) + if err != nil { + m.logger.Warnf("Failed to fetch $jsonSchema for collection %s: %v", coll, err) + continue + } + if s != nil { + m.collectionSchemasMu.Lock() + m.collectionSchemas[coll] = &cachedSchema{schema: s, keys: keys} + m.collectionSchemasMu.Unlock() + } + } + shutsig := shutdown.NewSignaller() m.shutsig = shutsig go func() { @@ -415,6 +460,7 @@ func (m *mongoCDC) Connect(ctx context.Context) error { } defer cancel() defer shutsig.TriggerHasStopped() + opts := options.ChangeStream(). SetBatchSize(int32(m.readBatchSize)). SetMaxAwaitTime(m.streamMaxWait) @@ -649,7 +695,7 @@ func (m *mongoCDC) readSnapshotRange( if err := cursor.Decode(&doc); err != nil { return fmt.Errorf("unable to decode document: %w", err) } - msg, err := m.newMongoDBCDCMessage(doc, "read", coll.Name(), snapshotTime) + msg, err := m.newMongoDBCDCMessage(doc, "read", coll.Name(), snapshotTime, false) if err != nil { return fmt.Errorf("unable to create message from document: %w", err) } @@ -749,6 +795,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso return fmt.Errorf("unable to extract operation type from change string, got: %s", data) } var doc any + var keyOnly bool // true when doc is documentKey-only or synthetic partial update switch opType { case "update": if m.docMode == documentModePartialUpdate { @@ -819,6 +866,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso } key["operations"] = ops doc = key + keyOnly = true // synthetic structure, don't infer schema from it break } fallthrough @@ -833,6 +881,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso if doc == nil { // this is when pre images are not available doc = data["documentKey"] + keyOnly = true } case "invalidate": return errors.New("watch stream invalidated") @@ -848,7 +897,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso if !ok { return fmt.Errorf("unable to extract optime from change stream, got: %T", data["clusterTime"]) } - msg, err := m.newMongoDBCDCMessage(doc, opType, coll, optime) + msg, err := m.newMongoDBCDCMessage(doc, opType, coll, optime, keyOnly) if err != nil { return fmt.Errorf("unable to create message from change stream event: %w", err) } @@ -884,7 +933,12 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso return stream.Err() } -func (m *mongoCDC) newMongoDBCDCMessage(doc any, operationType, collectionName string, opTime bson.Timestamp) (msg *service.Message, err error) { +// newMongoDBCDCMessage creates a service.Message from a BSON document with +// appropriate metadata. When keyOnly is true the document represents only a +// documentKey (e.g. a delete without pre-images) or a synthetic partial-update +// structure — schema inference is skipped and only the cached schema (if any) +// is attached. +func (m *mongoCDC) newMongoDBCDCMessage(doc any, operationType, collectionName string, opTime bson.Timestamp, keyOnly bool) (msg *service.Message, err error) { var b []byte if doc != nil { b, err = bson.MarshalExtJSON(doc, m.marshalCanonical, false) @@ -904,9 +958,51 @@ func (m *mongoCDC) newMongoDBCDCMessage(doc any, operationType, collectionName s // This is the JSON format for a timestamp, but the normalize serialization stuff doesn't support writing // one at the top level. msg.MetaSetMut("operation_time", fmt.Sprintf(`{"$timestamp":{"t":%d,"i":%d}}`, opTime.T, opTime.I)) + + // Attach schema metadata. + if docM, ok := doc.(bson.M); ok { + var s any + if keyOnly { + s = m.getCachedSchema(collectionName) + } else { + s = m.getOrInferCollectionSchema(collectionName, docM) + } + if s != nil { + msg.MetaSetImmut("schema", service.ImmutableAny{V: s}) + } + } return msg, nil } +// getOrInferCollectionSchema returns the cached schema if the document's key +// set matches, or infers a new schema and updates the cache. +func (m *mongoCDC) getOrInferCollectionSchema(collectionName string, doc bson.M) any { + docKeys := sortedMapKeys(doc) + + m.collectionSchemasMu.Lock() + defer m.collectionSchemasMu.Unlock() + + if cached, ok := m.collectionSchemas[collectionName]; ok && slices.Equal(cached.keys, docKeys) { + return cached.schema + } + + // Cache miss or key-set mismatch — (re-)infer. + s, keys := inferSchemaFromDocument(collectionName, doc) + m.collectionSchemas[collectionName] = &cachedSchema{schema: s, keys: keys} + return s +} + +// getCachedSchema returns the cached schema for a collection without inference. +// Used for keyOnly documents (deletes with documentKey, partial updates). +func (m *mongoCDC) getCachedSchema(collectionName string) any { + m.collectionSchemasMu.RLock() + defer m.collectionSchemasMu.RUnlock() + if cached, ok := m.collectionSchemas[collectionName]; ok { + return cached.schema + } + return nil +} + func (m *mongoCDC) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { select { case mb := <-m.readChan: diff --git a/internal/impl/mongodb/cdc/integration_test.go b/internal/impl/mongodb/cdc/integration_test.go index 967762c3fe..17cf92c0c3 100644 --- a/internal/impl/mongodb/cdc/integration_test.go +++ b/internal/impl/mongodb/cdc/integration_test.go @@ -29,6 +29,7 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo/options" _ "github.com/redpanda-data/benthos/v4/public/components/io" + "github.com/redpanda-data/benthos/v4/public/schema" "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/benthos/v4/public/service/integration" @@ -238,6 +239,8 @@ func (o *outputHelper) Metadata(t *testing.T) []map[string]any { case "operation_time": // Make this deterministic meta[k] = "$timestamp" + case "schema": + // Schema is complex structured metadata, tested separately default: meta[k] = v } @@ -257,6 +260,34 @@ func (o *outputHelper) MetadataJSON(t *testing.T) string { return string(b) } +// Schemas returns the parsed schema.Common for each message. Messages without +// schema metadata produce a zero-value entry. +func (o *outputHelper) Schemas(t *testing.T) []schema.Common { + t.Helper() + o.mu.Lock() + defer o.mu.Unlock() + var schemas []schema.Common + for _, b := range o.batches { + for _, m := range b { + var s schema.Common + var raw any + _ = m.MetaWalkMut(func(k string, v any) error { + if k == "schema" { + raw = v + } + return nil + }) + if raw != nil { + parsed, err := schema.ParseFromAny(raw) + require.NoError(t, err) + s = parsed + } + schemas = append(schemas, s) + } + } + return schemas +} + type setupOption = func(client *mongo.Client) error func enablePreAndPostDocuments() setupOption { @@ -880,3 +911,325 @@ file: } } } + +// --------------------------------------------------------------------------- +// Schema integration tests +// --------------------------------------------------------------------------- + +func TestIntegrationMongoCDCSchemaOnInsert(t *testing.T) { + stream, db, output := setup(t, ` +mongodb_cdc: + url: '$URI' + database: '$DATABASE' + checkpoint_cache: '$CACHE' + collections: + - 'foo' +`) + db.CreateCollection(t, "foo") + wait := stream.RunAsync(t) + time.Sleep(2 * time.Second) + db.InsertOne(t, "foo", bson.M{"_id": "1", "name": "alice", "age": int32(30)}) + time.Sleep(3 * time.Second) + stream.StopWithin(t, 10*time.Second) + wait() + + schemas := output.Schemas(t) + require.Len(t, schemas, 1) + s := schemas[0] + assert.Equal(t, "foo", s.Name) + assert.Equal(t, schema.Object, s.Type) + require.Len(t, s.Children, 3) + // Alphabetically sorted + assert.Equal(t, "_id", s.Children[0].Name) + assert.Equal(t, schema.String, s.Children[0].Type) + assert.Equal(t, "age", s.Children[1].Name) + assert.Equal(t, schema.Int32, s.Children[1].Type) + assert.Equal(t, "name", s.Children[2].Name) + assert.Equal(t, schema.String, s.Children[2].Type) + for _, c := range s.Children { + assert.True(t, c.Optional) + } +} + +func TestIntegrationMongoCDCSnapshotSchema(t *testing.T) { + stream, db, output := setup(t, ` +read_until: + idle_timeout: 3s + input: + mongodb_cdc: + url: '$URI' + database: '$DATABASE' + checkpoint_cache: '$CACHE' + stream_snapshot: true + collections: + - 'foo' +`) + db.CreateCollection(t, "foo") + for i := range 5 { + db.InsertOne(t, "foo", bson.M{"_id": i + 1, "name": fmt.Sprintf("user%d", i), "value": "x"}) + } + stream.Run(t) + stream.Stop(t) + + schemas := output.Schemas(t) + require.GreaterOrEqual(t, len(schemas), 5) + for i, s := range schemas { + assert.Equal(t, "foo", s.Name, "schema %d", i) + assert.Equal(t, schema.Object, s.Type, "schema %d", i) + require.Len(t, s.Children, 3, "schema %d", i) + assert.Equal(t, "_id", s.Children[0].Name) + assert.Equal(t, "name", s.Children[1].Name) + assert.Equal(t, "value", s.Children[2].Name) + } +} + +func TestIntegrationMongoCDCSchemaChange(t *testing.T) { + stream, db, output := setup(t, ` +read_until: + idle_timeout: 3s + input: + mongodb_cdc: + url: '$URI' + database: '$DATABASE' + checkpoint_cache: '$CACHE' + stream_snapshot: true + collections: + - 'foo' +`) + db.CreateCollection(t, "foo") + // First doc: 2 fields + db.InsertOne(t, "foo", bson.M{"_id": 1, "name": "alice"}) + wait := stream.RunAsync(t) + time.Sleep(2 * time.Second) + // Second doc: 3 fields — triggers schema change via key-set fingerprinting + db.InsertOne(t, "foo", bson.M{"_id": 2, "name": "bob", "email": "bob@test.com"}) + time.Sleep(3 * time.Second) + stream.StopWithin(t, 10*time.Second) + wait() + + schemas := output.Schemas(t) + require.GreaterOrEqual(t, len(schemas), 2) + // First message (snapshot): [_id, name] + assert.Len(t, schemas[0].Children, 2) + assert.Equal(t, "_id", schemas[0].Children[0].Name) + assert.Equal(t, "name", schemas[0].Children[1].Name) + // Last message (insert with email): [_id, email, name] + last := schemas[len(schemas)-1] + assert.Len(t, last.Children, 3) + assert.Equal(t, "_id", last.Children[0].Name) + assert.Equal(t, "email", last.Children[1].Name) + assert.Equal(t, "name", last.Children[2].Name) +} + +func TestIntegrationMongoCDCSchemaOrdering(t *testing.T) { + stream, db, output := setup(t, ` +read_until: + idle_timeout: 3s + input: + mongodb_cdc: + url: '$URI' + database: '$DATABASE' + checkpoint_cache: '$CACHE' + stream_snapshot: true + collections: + - 'foo' +`) + db.CreateCollection(t, "foo") + for i := range 20 { + db.InsertOne(t, "foo", bson.M{ + "_id": i + 1, + "zulu": "z", + "alpha": "a", + "mike": "m", + }) + } + stream.Run(t) + stream.Stop(t) + + schemas := output.Schemas(t) + require.GreaterOrEqual(t, len(schemas), 20) + expected := []string{"_id", "alpha", "mike", "zulu"} + for i, s := range schemas { + names := make([]string, len(s.Children)) + for j, c := range s.Children { + names[j] = c.Name + } + assert.Equal(t, expected, names, "schema %d has wrong field order", i) + } +} + +func TestIntegrationMongoCDCMultiCollectionSchema(t *testing.T) { + stream, db, output := setup(t, ` +mongodb_cdc: + url: '$URI' + database: '$DATABASE' + checkpoint_cache: '$CACHE' + collections: + - 'users' + - 'events' +`) + db.CreateCollection(t, "users") + db.CreateCollection(t, "events") + wait := stream.RunAsync(t) + time.Sleep(2 * time.Second) + db.InsertOne(t, "users", bson.M{"_id": "1", "name": "alice", "age": int32(30)}) + db.InsertOne(t, "events", bson.M{"_id": "1", "type": "login", "ts": bson.DateTime(time.Now().UnixMilli())}) + time.Sleep(3 * time.Second) + stream.StopWithin(t, 10*time.Second) + wait() + + schemas := output.Schemas(t) + require.Len(t, schemas, 2) + + // Find schemas by collection name + schemaByName := map[string]schema.Common{} + for _, s := range schemas { + schemaByName[s.Name] = s + } + + users := schemaByName["users"] + require.Len(t, users.Children, 3) + assert.Equal(t, "_id", users.Children[0].Name) + assert.Equal(t, schema.String, users.Children[0].Type) + assert.Equal(t, "age", users.Children[1].Name) + assert.Equal(t, schema.Int32, users.Children[1].Type) + assert.Equal(t, "name", users.Children[2].Name) + assert.Equal(t, schema.String, users.Children[2].Type) + + events := schemaByName["events"] + require.Len(t, events.Children, 3) + assert.Equal(t, "_id", events.Children[0].Name) + assert.Equal(t, schema.String, events.Children[0].Type) + assert.Equal(t, "ts", events.Children[1].Name) + assert.Equal(t, schema.Timestamp, events.Children[1].Type) + assert.Equal(t, "type", events.Children[2].Name) + assert.Equal(t, schema.String, events.Children[2].Type) +} + +func TestIntegrationMongoCDCDeleteUsesCache(t *testing.T) { + stream, db, output := setup(t, ` +mongodb_cdc: + url: '$URI' + database: '$DATABASE' + checkpoint_cache: '$CACHE' + collections: + - 'foo' +`) + db.CreateCollection(t, "foo") + wait := stream.RunAsync(t) + time.Sleep(2 * time.Second) + db.InsertOne(t, "foo", bson.M{"_id": "1", "name": "alice"}) + time.Sleep(time.Second) + db.DeleteByID(t, "foo", "1") + time.Sleep(3 * time.Second) + stream.StopWithin(t, 10*time.Second) + wait() + + schemas := output.Schemas(t) + require.Len(t, schemas, 2) + // Insert schema + assert.Equal(t, "foo", schemas[0].Name) + assert.Len(t, schemas[0].Children, 2) + // Delete should use cached schema (same as insert) + assert.Equal(t, "foo", schemas[1].Name) + assert.Len(t, schemas[1].Children, 2) + assert.Equal(t, schemas[0].Children[0].Name, schemas[1].Children[0].Name) + assert.Equal(t, schemas[0].Children[1].Name, schemas[1].Children[1].Name) +} + +func TestIntegrationMongoCDCSchemaValidator(t *testing.T) { + stream, db, output := setup(t, ` +mongodb_cdc: + url: '$URI' + database: '$DATABASE' + checkpoint_cache: '$CACHE' + collections: + - 'foo' +`) + db.CreateCollection(t, "foo", options.CreateCollection().SetValidator(bson.M{ + "$jsonSchema": bson.M{ + "bsonType": "object", + "required": bson.A{"name"}, + "properties": bson.M{ + "name": bson.M{"bsonType": "string"}, + "age": bson.M{"bsonType": "int"}, + "active": bson.M{"bsonType": "bool"}, + }, + }, + })) + wait := stream.RunAsync(t) + time.Sleep(2 * time.Second) + // Insert a document that matches the validator and also has _id (not in the validator). + db.InsertOne(t, "foo", bson.M{"_id": "1", "name": "alice", "age": int32(30), "active": true}) + time.Sleep(3 * time.Second) + stream.StopWithin(t, 10*time.Second) + wait() + + schemas := output.Schemas(t) + require.Len(t, schemas, 1) + s := schemas[0] + assert.Equal(t, "foo", s.Name) + assert.Equal(t, schema.Object, s.Type) + // The $jsonSchema validator has 3 properties (name, age, active). The _id field + // is auto-injected into the Tier 1 schema so the key-set fingerprint matches the + // document's 4 fields (_id, active, age, name). The Tier 1 schema is preserved, + // keeping the required/optional classification from the validator. + require.Len(t, s.Children, 4) + assert.Equal(t, "_id", s.Children[0].Name) + assert.Equal(t, schema.String, s.Children[0].Type) + assert.True(t, s.Children[0].Optional) // auto-injected + + assert.Equal(t, "active", s.Children[1].Name) + assert.Equal(t, schema.Boolean, s.Children[1].Type) + assert.True(t, s.Children[1].Optional) // not in required + + assert.Equal(t, "age", s.Children[2].Name) + assert.Equal(t, schema.Int32, s.Children[2].Type) + assert.True(t, s.Children[2].Optional) // not in required + + assert.Equal(t, "name", s.Children[3].Name) + assert.Equal(t, schema.String, s.Children[3].Type) + assert.False(t, s.Children[3].Optional) // in required — Tier 1 preserved +} + +func TestIntegrationMongoCDCPartialUpdateSchema(t *testing.T) { + stream, db, output := setup(t, ` +mongodb_cdc: + url: '$URI' + database: '$DATABASE' + checkpoint_cache: '$CACHE' + document_mode: partial_update + collections: + - 'foo' +`) + db.CreateCollection(t, "foo") + wait := stream.RunAsync(t) + time.Sleep(2 * time.Second) + db.InsertOne(t, "foo", bson.M{"_id": "1", "name": "alice", "age": int32(30)}) + time.Sleep(time.Second) + db.UpdateOne(t, "foo", "1", bson.M{"$set": bson.M{"age": int32(31)}}) + time.Sleep(3 * time.Second) + stream.StopWithin(t, 10*time.Second) + wait() + + msgs := output.Messages(t) + require.Len(t, msgs, 2) + schemas := output.Schemas(t) + require.Len(t, schemas, 2) + + // Insert: full document schema — [_id: String, age: Int32, name: String] + assert.Equal(t, "foo", schemas[0].Name) + require.Len(t, schemas[0].Children, 3) + assert.Equal(t, "_id", schemas[0].Children[0].Name) + assert.Equal(t, "age", schemas[0].Children[1].Name) + assert.Equal(t, schema.Int32, schemas[0].Children[1].Type) + assert.Equal(t, "name", schemas[0].Children[2].Name) + + // Partial update: should use the CACHED schema from the insert, NOT infer + // from the synthetic {_id, operations} structure. + assert.Equal(t, "foo", schemas[1].Name) + require.Len(t, schemas[1].Children, 3, "partial update should use cached 3-field schema, not synthetic doc") + assert.Equal(t, "_id", schemas[1].Children[0].Name) + assert.Equal(t, "age", schemas[1].Children[1].Name) + assert.Equal(t, "name", schemas[1].Children[2].Name) +} diff --git a/internal/impl/mongodb/cdc/schema.go b/internal/impl/mongodb/cdc/schema.go new file mode 100644 index 0000000000..5b373a6960 --- /dev/null +++ b/internal/impl/mongodb/cdc/schema.go @@ -0,0 +1,355 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md + +package cdc + +import ( + "context" + "fmt" + "slices" + "time" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + + "github.com/redpanda-data/benthos/v4/public/schema" +) + +// --------------------------------------------------------------------------- +// Tier 1: $jsonSchema validator conversion +// --------------------------------------------------------------------------- + +// fetchCollectionSchema queries the collection's $jsonSchema validator via +// listCollections and converts it to a serialised schema.Common. Returns +// (nil, nil, nil) when no validator is configured. +func fetchCollectionSchema(ctx context.Context, db *mongo.Database, collectionName string) (any, []string, error) { + cursor, err := db.ListCollections(ctx, bson.M{"name": collectionName}) + if err != nil { + return nil, nil, fmt.Errorf("listing collections: %w", err) + } + defer cursor.Close(ctx) + + if !cursor.Next(ctx) { + return nil, nil, nil // collection not found + } + var info bson.M + if err := cursor.Decode(&info); err != nil { + return nil, nil, fmt.Errorf("decoding collection info: %w", err) + } + + opts, _ := info["options"].(bson.M) + if opts == nil { + return nil, nil, nil + } + validator, _ := opts["validator"].(bson.M) + if validator == nil { + return nil, nil, nil + } + jsonSchema, _ := validator["$jsonSchema"].(bson.M) + if jsonSchema == nil { + return nil, nil, nil + } + + s, keys, err := schemaFromJSONSchema(collectionName, jsonSchema) + if err != nil { + return nil, nil, fmt.Errorf("converting $jsonSchema: %w", err) + } + return s, keys, nil +} + +// schemaFromJSONSchema converts a MongoDB $jsonSchema validator to a serialised +// schema.Common. Returns (nil, nil, nil) if the validator cannot be converted +// (e.g. only uses combinators with no properties). +func schemaFromJSONSchema(collectionName string, jsonSchema bson.M) (any, []string, error) { + props, _ := jsonSchema["properties"].(bson.M) + if props == nil { + // Top-level validator with no properties (e.g. pure oneOf/anyOf) — + // fall back to Tier 2. + return nil, nil, nil + } + + requiredSet := map[string]bool{} + if reqArr, ok := jsonSchema["required"].(bson.A); ok { + for _, r := range reqArr { + if s, ok := r.(string); ok { + requiredSet[s] = true + } + } + } + + children, keys := jsonSchemaPropsToChildren(props, requiredSet) + + // $jsonSchema validators almost never declare _id, but every document has + // it. Without _id the key-set fingerprint will always mismatch on the + // first real document and the Tier 1 schema will be discarded immediately. + // Inject _id as an optional String field when it is not already present. + if !slices.Contains(keys, "_id") { + children = slices.Insert(children, 0, schema.Common{Name: "_id", Type: schema.String, Optional: true}) + keys = slices.Insert(keys, 0, "_id") + } + + c := schema.Common{ + Name: collectionName, + Type: schema.Object, + Optional: false, + Children: children, + } + return c.ToAny(), keys, nil +} + +// jsonSchemaPropsToChildren converts a $jsonSchema properties map to sorted +// schema.Common children and returns the sorted key list. +func jsonSchemaPropsToChildren(props bson.M, requiredSet map[string]bool) ([]schema.Common, []string) { + keys := sortedMapKeys(props) + children := make([]schema.Common, 0, len(keys)) + for _, name := range keys { + fieldSchema, ok := props[name].(bson.M) + if !ok { + children = append(children, schema.Common{ + Name: name, + Type: schema.Any, + Optional: !requiredSet[name], + }) + continue + } + children = append(children, jsonSchemaFieldToCommon(name, fieldSchema, requiredSet[name])) + } + return children, keys +} + +// jsonSchemaFieldToCommon converts a single $jsonSchema field definition to a +// schema.Common. +func jsonSchemaFieldToCommon(name string, fieldSchema bson.M, required bool) schema.Common { + // Check for combinators that we can't convert — map to Any. + for _, combinator := range []string{"oneOf", "anyOf", "allOf", "not"} { + if _, hasCombinator := fieldSchema[combinator]; hasCombinator { + return schema.Common{Name: name, Type: schema.Any, Optional: !required} + } + } + + bsonType, optional := resolveBsonType(fieldSchema) + ct := bsonTypeStringToCommon(bsonType) + + c := schema.Common{ + Name: name, + Type: ct, + Optional: !required || optional, + } + + if ct == schema.Object { + if nestedProps, ok := fieldSchema["properties"].(bson.M); ok { + nestedRequired := map[string]bool{} + if reqArr, ok := fieldSchema["required"].(bson.A); ok { + for _, r := range reqArr { + if s, ok := r.(string); ok { + nestedRequired[s] = true + } + } + } + c.Children, _ = jsonSchemaPropsToChildren(nestedProps, nestedRequired) + } + } + + if ct == schema.Array { + if items, ok := fieldSchema["items"].(bson.M); ok { + itemType, _ := resolveBsonType(items) + c.Children = []schema.Common{ + {Name: "element", Type: bsonTypeStringToCommon(itemType), Optional: true}, + } + } + } + + return c +} + +// resolveBsonType extracts the effective bsonType string from a field schema. +// It handles bsonType as a string or an array (union type). Returns the +// resolved type string and whether "null" was present in a union. +func resolveBsonType(fieldSchema bson.M) (string, bool) { + raw := fieldSchema["bsonType"] + switch v := raw.(type) { + case string: + return v, false + case bson.A: + var nonNull []string + hasNull := false + for _, elem := range v { + s, ok := elem.(string) + if !ok { + continue + } + if s == "null" { + hasNull = true + } else { + nonNull = append(nonNull, s) + } + } + if len(nonNull) == 1 { + return nonNull[0], hasNull + } + // Multiple non-null types or empty — fall back to Any. + return "", hasNull + default: + return "", false + } +} + +// bsonTypeStringToCommon maps a $jsonSchema bsonType string to a +// schema.CommonType. +func bsonTypeStringToCommon(bsonType string) schema.CommonType { + switch bsonType { + case "bool": + return schema.Boolean + case "int": + return schema.Int32 + case "long": + return schema.Int64 + case "double": + return schema.Float64 + case "string": + return schema.String + case "binData": + return schema.ByteArray + case "date": + return schema.Timestamp + case "timestamp": + return schema.Timestamp + case "objectId": + return schema.String + case "decimal": + return schema.String + case "object": + return schema.Object + case "array": + return schema.Array + default: + return schema.Any + } +} + +// --------------------------------------------------------------------------- +// Tier 2: Document inference +// --------------------------------------------------------------------------- + +// inferSchemaFromDocument infers a schema.Common from a bson.M document and +// returns the serialised form (via ToAny()) along with sorted top-level keys. +func inferSchemaFromDocument(collectionName string, doc bson.M) (any, []string) { + keys := sortedMapKeys(doc) + children := make([]schema.Common, 0, len(keys)) + for _, k := range keys { + children = append(children, inferField(k, doc[k])) + } + c := schema.Common{ + Name: collectionName, + Type: schema.Object, + Optional: false, + Children: children, + } + return c.ToAny(), keys +} + +// inferField maps a single Go value (from BSON decoding) to a schema.Common. +func inferField(name string, val any) schema.Common { + c := schema.Common{ + Name: name, + Type: inferType(val), + Optional: true, + } + + switch v := val.(type) { + case bson.M: + keys := sortedMapKeys(v) + children := make([]schema.Common, 0, len(keys)) + for _, k := range keys { + children = append(children, inferField(k, v[k])) + } + c.Children = children + case bson.D: + m := make(bson.M, len(v)) + for _, elem := range v { + m[elem.Key] = elem.Value + } + keys := sortedMapKeys(m) + children := make([]schema.Common, 0, len(keys)) + for _, k := range keys { + children = append(children, inferField(k, m[k])) + } + c.Children = children + case bson.A: + if len(v) > 0 { + elemType := inferType(v[0]) + // If mixed types, fall back to Any. + for _, elem := range v[1:] { + if inferType(elem) != elemType { + elemType = schema.Any + break + } + } + c.Children = []schema.Common{ + {Name: "element", Type: elemType, Optional: true}, + } + } + } + + return c +} + +// inferType maps a Go value (from BSON decoding with DefaultDocumentM=true) to +// a schema.CommonType. +func inferType(val any) schema.CommonType { + switch val.(type) { + case bool: + return schema.Boolean + case int32: + return schema.Int32 + case int64: + return schema.Int64 + case float64: + return schema.Float64 + case string: + return schema.String + case bson.Binary: + return schema.ByteArray + case []byte: + return schema.ByteArray + case bson.DateTime: + return schema.Timestamp + case time.Time: + return schema.Timestamp + case bson.Timestamp: + return schema.Timestamp + case bson.ObjectID: + return schema.String + case bson.Decimal128: + return schema.String + case bson.M: + return schema.Object + case bson.D: + return schema.Object + case bson.A: + return schema.Array + case nil: + return schema.Any + default: + return schema.Any + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +// sortedMapKeys returns the keys of a bson.M sorted alphabetically. +func sortedMapKeys(m bson.M) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + slices.Sort(keys) + return keys +} diff --git a/internal/impl/mongodb/cdc/schema_test.go b/internal/impl/mongodb/cdc/schema_test.go new file mode 100644 index 0000000000..1f07058931 --- /dev/null +++ b/internal/impl/mongodb/cdc/schema_test.go @@ -0,0 +1,356 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/v4/blob/main/licenses/rcl.md + +package cdc + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/v2/bson" + + "github.com/redpanda-data/benthos/v4/public/schema" +) + +// parseSchema is a test helper that round-trips a serialised schema through +// ParseFromAny and returns the result. +func parseSchema(t *testing.T, s any) schema.Common { + t.Helper() + require.NotNil(t, s) + c, err := schema.ParseFromAny(s) + require.NoError(t, err) + return c +} + +// childByName finds a child by name in a Common schema. +func childByName(t *testing.T, c schema.Common, name string) schema.Common { + t.Helper() + for i := range c.Children { + if c.Children[i].Name == name { + return c.Children[i] + } + } + t.Fatalf("child %q not found in %v", name, c.Children) + return schema.Common{} +} + +// --------------------------------------------------------------------------- +// Tier 1: $jsonSchema conversion +// --------------------------------------------------------------------------- + +func TestBsonTypeStringToCommon(t *testing.T) { + tests := []struct { + bsonType string + expected schema.CommonType + }{ + {"bool", schema.Boolean}, + {"int", schema.Int32}, + {"long", schema.Int64}, + {"double", schema.Float64}, + {"string", schema.String}, + {"binData", schema.ByteArray}, + {"date", schema.Timestamp}, + {"timestamp", schema.Timestamp}, + {"objectId", schema.String}, + {"decimal", schema.String}, + {"object", schema.Object}, + {"array", schema.Array}, + {"", schema.Any}, + {"unknown", schema.Any}, + } + for _, tt := range tests { + t.Run(tt.bsonType, func(t *testing.T) { + assert.Equal(t, tt.expected, bsonTypeStringToCommon(tt.bsonType)) + }) + } +} + +func TestSchemaFromJSONSchemaBasic(t *testing.T) { + s, keys, err := schemaFromJSONSchema("test_coll", bson.M{ + "bsonType": "object", + "required": bson.A{"name"}, + "properties": bson.M{ + "name": bson.M{"bsonType": "string"}, + "age": bson.M{"bsonType": "int"}, + }, + }) + require.NoError(t, err) + require.NotNil(t, s) + assert.Equal(t, []string{"_id", "age", "name"}, keys) // _id auto-injected + + c := parseSchema(t, s) + assert.Equal(t, "test_coll", c.Name) + assert.Equal(t, schema.Object, c.Type) + require.Len(t, c.Children, 3) + + // Sorted alphabetically, _id auto-injected first + assert.Equal(t, "_id", c.Children[0].Name) + assert.Equal(t, schema.String, c.Children[0].Type) + assert.True(t, c.Children[0].Optional) // auto-injected + + assert.Equal(t, "age", c.Children[1].Name) + assert.Equal(t, schema.Int32, c.Children[1].Type) + assert.True(t, c.Children[1].Optional) // not in required + + assert.Equal(t, "name", c.Children[2].Name) + assert.Equal(t, schema.String, c.Children[2].Type) + assert.False(t, c.Children[2].Optional) // in required +} + +func TestSchemaFromJSONSchemaBsonTypeArray(t *testing.T) { + tests := []struct { + name string + bsonType bson.A + expectedType schema.CommonType + expectOptl bool // additional optionality from null in array + }{ + {"string_null", bson.A{"string", "null"}, schema.String, true}, + {"string_int", bson.A{"string", "int"}, schema.Any, false}, + {"null_only", bson.A{"null"}, schema.Any, true}, + {"empty", bson.A{}, schema.Any, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, _, err := schemaFromJSONSchema("coll", bson.M{ + "bsonType": "object", + "properties": bson.M{ + "field": bson.M{"bsonType": tt.bsonType}, + }, + }) + require.NoError(t, err) + c := parseSchema(t, s) + f := childByName(t, c, "field") + assert.Equal(t, tt.expectedType, f.Type) + if tt.expectOptl { + assert.True(t, f.Optional) + } + }) + } +} + +func TestSchemaFromJSONSchemaNestedObject(t *testing.T) { + s, _, err := schemaFromJSONSchema("coll", bson.M{ + "bsonType": "object", + "properties": bson.M{ + "address": bson.M{ + "bsonType": "object", + "required": bson.A{"city"}, + "properties": bson.M{ + "city": bson.M{"bsonType": "string"}, + "zip": bson.M{"bsonType": "string"}, + "alpha": bson.M{"bsonType": "int"}, + }, + }, + }, + }) + require.NoError(t, err) + c := parseSchema(t, s) + addr := childByName(t, c, "address") + assert.Equal(t, schema.Object, addr.Type) + require.Len(t, addr.Children, 3) + // Sorted alphabetically + assert.Equal(t, "alpha", addr.Children[0].Name) + assert.Equal(t, "city", addr.Children[1].Name) + assert.False(t, addr.Children[1].Optional) + assert.Equal(t, "zip", addr.Children[2].Name) + assert.True(t, addr.Children[2].Optional) +} + +func TestSchemaFromJSONSchemaArrayWithItems(t *testing.T) { + s, _, err := schemaFromJSONSchema("coll", bson.M{ + "bsonType": "object", + "properties": bson.M{ + "tags": bson.M{ + "bsonType": "array", + "items": bson.M{"bsonType": "string"}, + }, + }, + }) + require.NoError(t, err) + c := parseSchema(t, s) + tags := childByName(t, c, "tags") + assert.Equal(t, schema.Array, tags.Type) + require.Len(t, tags.Children, 1) + assert.Equal(t, schema.String, tags.Children[0].Type) +} + +func TestSchemaFromJSONSchemaCombinatorField(t *testing.T) { + for _, combinator := range []string{"oneOf", "anyOf", "allOf", "not"} { + t.Run(combinator, func(t *testing.T) { + s, _, err := schemaFromJSONSchema("coll", bson.M{ + "bsonType": "object", + "properties": bson.M{ + "data": bson.M{combinator: bson.A{}}, + }, + }) + require.NoError(t, err) + c := parseSchema(t, s) + assert.Equal(t, schema.Any, childByName(t, c, "data").Type) + }) + } +} + +func TestSchemaFromJSONSchemaNoProperties(t *testing.T) { + s, keys, err := schemaFromJSONSchema("coll", bson.M{ + "bsonType": "object", + "oneOf": bson.A{}, + }) + require.NoError(t, err) + assert.Nil(t, s) + assert.Nil(t, keys) +} + +// --------------------------------------------------------------------------- +// Tier 2: Document inference +// --------------------------------------------------------------------------- + +func TestInferSchemaFromDocumentTypes(t *testing.T) { + doc := bson.M{ + "bool_field": true, + "int32_field": int32(42), + "int64_field": int64(99), + "float64_field": 3.14, + "string_field": "hello", + "binary_field": bson.Binary{Data: []byte("data")}, + "date_field": bson.DateTime(time.Now().UnixMilli()), + "ts_field": bson.Timestamp{T: 1, I: 1}, + "oid_field": bson.ObjectID{}, + "dec_field": bson.Decimal128{}, + "nested_field": bson.M{"x": int32(1)}, + "array_field": bson.A{"a", "b"}, + "nil_field": nil, + } + + s, keys := inferSchemaFromDocument("coll", doc) + require.NotNil(t, s) + assert.Len(t, keys, 13) + + c := parseSchema(t, s) + assert.Equal(t, schema.Object, c.Type) + require.Len(t, c.Children, 13) + + expectations := map[string]schema.CommonType{ + "array_field": schema.Array, + "binary_field": schema.ByteArray, + "bool_field": schema.Boolean, + "date_field": schema.Timestamp, + "dec_field": schema.String, + "float64_field": schema.Float64, + "int32_field": schema.Int32, + "int64_field": schema.Int64, + "nested_field": schema.Object, + "nil_field": schema.Any, + "oid_field": schema.String, + "string_field": schema.String, + "ts_field": schema.Timestamp, + } + for _, child := range c.Children { + expected, ok := expectations[child.Name] + require.True(t, ok, "unexpected child: %s", child.Name) + assert.Equal(t, expected, child.Type, "wrong type for %s", child.Name) + assert.True(t, child.Optional, "%s should be optional", child.Name) + } +} + +func TestInferSchemaFromDocumentNestedChildren(t *testing.T) { + doc := bson.M{ + "outer": bson.M{ + "zebra": "z", + "alpha": int32(1), + }, + } + s, _ := inferSchemaFromDocument("coll", doc) + c := parseSchema(t, s) + outer := childByName(t, c, "outer") + assert.Equal(t, schema.Object, outer.Type) + require.Len(t, outer.Children, 2) + assert.Equal(t, "alpha", outer.Children[0].Name) + assert.Equal(t, "zebra", outer.Children[1].Name) +} + +func TestInferSchemaFromDocumentMixedArray(t *testing.T) { + doc := bson.M{"mixed": bson.A{"string", int32(42)}} + s, _ := inferSchemaFromDocument("coll", doc) + c := parseSchema(t, s) + mixed := childByName(t, c, "mixed") + assert.Equal(t, schema.Array, mixed.Type) + require.Len(t, mixed.Children, 1) + assert.Equal(t, schema.Any, mixed.Children[0].Type) +} + +func TestInferSchemaFromDocumentEmpty(t *testing.T) { + s, keys := inferSchemaFromDocument("coll", bson.M{}) + c := parseSchema(t, s) + assert.Equal(t, schema.Object, c.Type) + assert.Empty(t, c.Children) + assert.Empty(t, keys) +} + +// --------------------------------------------------------------------------- +// Deterministic ordering +// --------------------------------------------------------------------------- + +func TestInferSchemaFieldOrdering(t *testing.T) { + doc := bson.M{ + "zulu": "z", + "alpha": "a", + "mike": "m", + "bravo": "b", + } + + // Run multiple times to catch map iteration non-determinism. + var prev []string + for range 20 { + s, keys := inferSchemaFromDocument("coll", doc) + c := parseSchema(t, s) + + names := make([]string, len(c.Children)) + for i, ch := range c.Children { + names[i] = ch.Name + } + assert.Equal(t, []string{"alpha", "bravo", "mike", "zulu"}, names) + assert.Equal(t, []string{"alpha", "bravo", "mike", "zulu"}, keys) + if prev != nil { + assert.Equal(t, prev, names, "field ordering should be deterministic across iterations") + } + prev = names + } +} + +func TestSchemaFromJSONSchemaFieldOrdering(t *testing.T) { + props := bson.M{ + "zulu": bson.M{"bsonType": "string"}, + "alpha": bson.M{"bsonType": "int"}, + "mike": bson.M{"bsonType": "bool"}, + } + for range 20 { + s, keys, err := schemaFromJSONSchema("coll", bson.M{ + "bsonType": "object", + "properties": props, + }) + require.NoError(t, err) + c := parseSchema(t, s) + names := make([]string, len(c.Children)) + for i, ch := range c.Children { + names[i] = ch.Name + } + assert.Equal(t, []string{"_id", "alpha", "mike", "zulu"}, names) + assert.Equal(t, []string{"_id", "alpha", "mike", "zulu"}, keys) + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func TestSortedMapKeys(t *testing.T) { + m := bson.M{"z": 1, "a": 2, "m": 3} + assert.Equal(t, []string{"a", "m", "z"}, sortedMapKeys(m)) +}