Conversation
3c76bb7 to
2dddc58
Compare
|
Commits Review LGTM |
2dddc58 to
a916a6f
Compare
| tableName := "" | ||
| if len(d.conf.tables) > 0 { | ||
| tableName = d.conf.tables[0] | ||
| } |
There was a problem hiding this comment.
Bug: When tag discovery finds exactly 1 table, Connect routes to connectSingleTable (the single-table code path). However, convertRecordsToBatch reads the table name from d.conf.tables[0], which is the config value — empty for tag mode (tables: []). This means all CDC messages from a tag-discovered single table will have an empty tableName in both the record data and dynamodb_table metadata.
The discovered table name is passed as a parameter to connectSingleTable but is never stored on the struct for convertRecordsToBatch to use later.
Fix: store the resolved table name (from connectSingleTable's tableName parameter) on the struct so convertRecordsToBatch can use it, or refactor convertRecordsToBatch to accept tableName as a parameter like convertTableRecordsToBatch does.
| maxSize int | ||
| totalCount atomic.Int64 | ||
| overflow atomic.Bool | ||
| shards [32]bufferShard // 32 independent shards with separate locks |
There was a problem hiding this comment.
The previous code defined const numBufferShards = 32 and used it consistently. This PR removes that constant and uses the literal 32 in multiple places (here, getShard, newSnapshotSequenceBuffer). Per project patterns: "Name all numeric constants. Every literal number in logic must have a clear meaning through a named constant or variable."
Fix: restore a named constant (e.g. numBufferShards) and use it in all three locations.
|
Commits Review
|
a916a6f to
68a0f23
Compare
| // Update active shards metric | ||
| activeCount := 0 | ||
| ts.mu.RLock() | ||
| for shardID := range activeShards { | ||
| reader, exists := ts.shardReaders[shardID] | ||
| if exists && !reader.exhausted { | ||
| activeCount++ | ||
| } | ||
| } | ||
| ts.mu.RUnlock() |
There was a problem hiding this comment.
Bug: activeCount is computed but never used — the dynamodb_cdc_shards_active metric is never updated in multi-table mode. The single-table startShardCoordinator calls d.metrics.shardsActive.Set(int64(activeCount)) after the same computation (line 1466), but this multi-table equivalent is missing the metric update.
Add d.metrics.shardsActive.Set(int64(activeCount)) after the loop (or aggregate across all table coordinators if the gauge should represent the total).
|
Commits Review
|
68a0f23 to
fe62992
Compare
| // https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md | ||
|
|
||
| //go:build integration | ||
|
|
There was a problem hiding this comment.
Project test pattern violation: The //go:build integration build tag is explicitly prohibited by project test conventions:
Do not use build tags. Use
integration.CheckSkip(t)at the start of every integration test function.
Both benchmark functions already call integration.CheckSkip(b), so the build tag is redundant. Remove the //go:build integration line.
|
Commits Review
|
fe62992 to
e159997
Compare
|
Commits Review LGTM |
e159997 to
f026677
Compare
| - `+"`dynamodb_cdc_shards_active`"+` - Number of shards currently being read from (gauge) | ||
| - `+"`dynamodb_cdc_snapshot_state`"+` - Snapshot state: 0=not_started, 1=in_progress, 2=complete (gauge) | ||
| - `+"`dynamodb_cdc_snapshot_records_read`"+` - Total records read during snapshot (counter) | ||
| - `+"`dynamodb_cdc_snapshot_segments_active`"+` - Number of active snapshot scan segments (gauge) |
There was a problem hiding this comment.
Bug: Documented metrics removed but still emitted. Three metrics were removed from the documented metrics list here, but the code still actively registers and emits them:
dynamodb_cdc_snapshot_buffer_overflow— still incremented at line 2206dynamodb_cdc_snapshot_segment_duration— still timed at line 1215dynamodb_cdc_checkpoint_failures— still incremented at lines 1210 and 1909
These metrics should be re-added to the documentation, or if the intent was to remove them, their registration and emission code should be removed too.
|
Commits Review
|
f026677 to
dc7fe4c
Compare
| secret: xxxxx | ||
| token: xxxxx | ||
| `, tagKey, tagValue, checkpointTable, port) | ||
|
|
||
| spec := dynamoDBCDCInputConfig() | ||
| parsed, err := spec.ParseYAML(confStr, nil) | ||
| require.NoError(t, err) | ||
|
|
||
| input, err := newDynamoDBCDCInputFromConfig(parsed, service.MockResources()) | ||
| require.NoError(t, err) | ||
|
|
There was a problem hiding this comment.
Bug: dataMap["value"] accesses the top-level structured message, but the CDC message nests item data under dynamodb.newImage. The value key doesn't exist at the top level, so hasValue is always false and the value-matching assertions never execute.
To actually verify data isolation, the code needs to traverse into the nested structure: dataMap["dynamodb"].(map[string]any)["newImage"].(map[string]any)["value"].
As-is, the test only validates that events arrive from both tables (via eventsByTable counts), but the per-table value assertions are dead code.
|
Commits Review
|
dc7fe4c to
7f952b6
Compare
|
Commits Review LGTM |
Extend the DynamoDB CDC input to stream from multiple tables simultaneously, with automatic table discovery. Changes: - Replace single `table` field with `tables` list field - Add `table_discovery_mode` with three modes: - `single` (default): stream from one table (backward compatible) - `includelist`: stream from an explicit list of tables - `tag`: auto-discover tables by DynamoDB tags using `table_tag_filter` - Add `table_discovery_interval` for periodic rescanning (default 5m) - Each table maintains independent shard tracking and checkpoint state - Add multi-tag filter syntax: "key1:v1,v2;key2:v3,v4" for AND/OR tag matching - Add integration tests for multi-table streaming and tag discovery - Add end-to-end throughput benchmarks for CDC and snapshot modes This enables use cases where users need to capture changes from many DynamoDB tables (e.g. per-tenant tables) without deploying separate connectors for each, and allows new tables to be picked up automatically via tag-based discovery.
7f952b6 to
f8c3454
Compare
|
Commits Review LGTM |
No description provided.