Skip to content

Add multi tables support and discovery#4076

Merged
squiidz merged 1 commit intomainfrom
dynamo_cdc_multi_table
Mar 12, 2026
Merged

Add multi tables support and discovery#4076
squiidz merged 1 commit intomainfrom
dynamo_cdc_multi_table

Conversation

@squiidz
Copy link
Contributor

@squiidz squiidz commented Mar 9, 2026

No description provided.

@squiidz squiidz force-pushed the dynamo_cdc_multi_table branch 3 times, most recently from 3c76bb7 to 2dddc58 Compare March 9, 2026 14:47
@claude
Copy link

claude bot commented Mar 9, 2026

Commits
LGTM

Review
Thorough review of the multi-table streaming and tag-based discovery feature for DynamoDB CDC. The implementation is well-structured with correct lock hierarchy (documented and followed), proper shutdown coordination across single-table and multi-table code paths, and mathematically equivalent deduplication buffer behavior after the CAS-to-Add refactor. Config parsing, error handling, context propagation, and test patterns all follow project conventions.

LGTM

@redpanda-data redpanda-data deleted a comment from claude bot Mar 9, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 9, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 9, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 9, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 9, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 9, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 9, 2026
@redpanda-data redpanda-data deleted a comment from claude bot Mar 9, 2026
@squiidz squiidz force-pushed the dynamo_cdc_multi_table branch from 2dddc58 to a916a6f Compare March 10, 2026 13:45
Comment on lines +2349 to +2352
tableName := ""
if len(d.conf.tables) > 0 {
tableName = d.conf.tables[0]
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: When tag discovery finds exactly 1 table, Connect routes to connectSingleTable (the single-table code path). However, convertRecordsToBatch reads the table name from d.conf.tables[0], which is the config value — empty for tag mode (tables: []). This means all CDC messages from a tag-discovered single table will have an empty tableName in both the record data and dynamodb_table metadata.

The discovered table name is passed as a parameter to connectSingleTable but is never stored on the struct for convertRecordsToBatch to use later.

Fix: store the resolved table name (from connectSingleTable's tableName parameter) on the struct so convertRecordsToBatch can use it, or refactor convertRecordsToBatch to accept tableName as a parameter like convertTableRecordsToBatch does.

maxSize int
totalCount atomic.Int64
overflow atomic.Bool
shards [32]bufferShard // 32 independent shards with separate locks
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code defined const numBufferShards = 32 and used it consistently. This PR removes that constant and uses the literal 32 in multiple places (here, getShard, newSnapshotSequenceBuffer). Per project patterns: "Name all numeric constants. Every literal number in logic must have a clear meaning through a named constant or variable."

Fix: restore a named constant (e.g. numBufferShards) and use it in all three locations.

@claude
Copy link

claude bot commented Mar 10, 2026

Commits
LGTM

Review
Adds multi-table streaming and tag-based discovery to the DynamoDB CDC input. Well-structured lock hierarchy, proper shutdown coordination, and good integration test coverage.

  1. Bug — empty table name in single-table CDC path for tag discovery: When tag discovery finds exactly 1 table, the single-table code path is used, but convertRecordsToBatch reads the table name from d.conf.tables[0] (empty for tag mode), resulting in empty tableName in record data and dynamodb_table metadata.
  2. Magic number 32 replaces removed constant: The named constant numBufferShards was removed and replaced with literal 32 in multiple places, violating the project's "name all numeric constants" rule.

@squiidz squiidz force-pushed the dynamo_cdc_multi_table branch from a916a6f to 68a0f23 Compare March 10, 2026 14:27
Comment on lines +1568 to +1577
// Update active shards metric
activeCount := 0
ts.mu.RLock()
for shardID := range activeShards {
reader, exists := ts.shardReaders[shardID]
if exists && !reader.exhausted {
activeCount++
}
}
ts.mu.RUnlock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: activeCount is computed but never used — the dynamodb_cdc_shards_active metric is never updated in multi-table mode. The single-table startShardCoordinator calls d.metrics.shardsActive.Set(int64(activeCount)) after the same computation (line 1466), but this multi-table equivalent is missing the metric update.

Add d.metrics.shardsActive.Set(int64(activeCount)) after the loop (or aggregate across all table coordinators if the gauge should represent the total).

@claude
Copy link

claude bot commented Mar 10, 2026

Commits
LGTM

Review
Large, well-structured PR adding multi-table streaming and tag-based discovery to the DynamoDB CDC input. The lock hierarchy is well-documented, single-table vs multi-table path selection is clean, and the new integration/unit tests are thorough.

  1. Bug: Missing shardsActive metric update in multi-table coordinatoractiveCount is computed but never emitted in startTableStreamCoordinator, unlike the single-table startShardCoordinator which calls d.metrics.shardsActive.Set(...). See inline comment.

@squiidz squiidz force-pushed the dynamo_cdc_multi_table branch from 68a0f23 to fe62992 Compare March 10, 2026 14:52
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md

//go:build integration

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Project test pattern violation: The //go:build integration build tag is explicitly prohibited by project test conventions:

Do not use build tags. Use integration.CheckSkip(t) at the start of every integration test function.

Both benchmark functions already call integration.CheckSkip(b), so the build tag is redundant. Remove the //go:build integration line.

@claude
Copy link

claude bot commented Mar 10, 2026

Commits
LGTM

Review
Large, well-structured PR adding multi-table streaming and tag-based discovery to the DynamoDB CDC input. The lock hierarchy is clearly documented and correctly followed, the single-table vs multi-table code path split is clean, and the shutdown/cleanup logic properly tracks background workers. Config validation, tag filter parsing, and deduplication buffer changes are solid. New unit and integration tests provide good coverage of the added functionality.

  1. input_cdc_bench_test.go:10//go:build integration build tag violates project test pattern. Use integration.CheckSkip(t) only (already present in both benchmarks). Remove the build tag.

@squiidz squiidz force-pushed the dynamo_cdc_multi_table branch from fe62992 to e159997 Compare March 10, 2026 15:07
@claude
Copy link

claude bot commented Mar 10, 2026

Commits
LGTM

Review
Large PR adding multi-table streaming support and tag-based table discovery to the DynamoDB CDC input. The code follows project patterns well: field name constants use the dci prefix convention, ParsedConfig extraction uses named returns with bare return, error wrapping uses gerund form ("discovering tables: %w"), context.Background() is only used in factory functions and SoftStopCtx for background goroutines (not in component methods with available ctx), and the lock hierarchy is clearly documented. Tests are comprehensive with unit tests for tag parsing/matching logic, integration tests for multi-table streaming, and benchmarks. New integration tests properly use integration.CheckSkip(t), testcontainers-go, and context.Background() in cleanup functions.

LGTM

@squiidz squiidz force-pushed the dynamo_cdc_multi_table branch from e159997 to f026677 Compare March 10, 2026 15:27
- `+"`dynamodb_cdc_shards_active`"+` - Number of shards currently being read from (gauge)
- `+"`dynamodb_cdc_snapshot_state`"+` - Snapshot state: 0=not_started, 1=in_progress, 2=complete (gauge)
- `+"`dynamodb_cdc_snapshot_records_read`"+` - Total records read during snapshot (counter)
- `+"`dynamodb_cdc_snapshot_segments_active`"+` - Number of active snapshot scan segments (gauge)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Documented metrics removed but still emitted. Three metrics were removed from the documented metrics list here, but the code still actively registers and emits them:

  • dynamodb_cdc_snapshot_buffer_overflow — still incremented at line 2206
  • dynamodb_cdc_snapshot_segment_duration — still timed at line 1215
  • dynamodb_cdc_checkpoint_failures — still incremented at lines 1210 and 1909

These metrics should be re-added to the documentation, or if the intent was to remove them, their registration and emission code should be removed too.

@claude
Copy link

claude bot commented Mar 10, 2026

Commits
LGTM

Review
Large, well-structured PR adding multi-table streaming and tag-based discovery to the DynamoDB CDC input. The lock hierarchy documentation, double-check locking in initializeTableStream, and background worker lifecycle management are solid. Code follows project patterns (field name constants, error wrapping, context propagation).

  1. Three metrics (dynamodb_cdc_snapshot_buffer_overflow, dynamodb_cdc_snapshot_segment_duration, dynamodb_cdc_checkpoint_failures) were removed from the documented metrics list but are still actively registered and emitted by the code. Either re-add them to the docs or remove the emission code.

@squiidz squiidz force-pushed the dynamo_cdc_multi_table branch from f026677 to dc7fe4c Compare March 10, 2026 15:43
Comment on lines +1199 to +1209
secret: xxxxx
token: xxxxx
`, tagKey, tagValue, checkpointTable, port)

spec := dynamoDBCDCInputConfig()
parsed, err := spec.ParseYAML(confStr, nil)
require.NoError(t, err)

input, err := newDynamoDBCDCInputFromConfig(parsed, service.MockResources())
require.NoError(t, err)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: dataMap["value"] accesses the top-level structured message, but the CDC message nests item data under dynamodb.newImage. The value key doesn't exist at the top level, so hasValue is always false and the value-matching assertions never execute.

To actually verify data isolation, the code needs to traverse into the nested structure: dataMap["dynamodb"].(map[string]any)["newImage"].(map[string]any)["value"].

As-is, the test only validates that events arrive from both tables (via eventsByTable counts), but the per-table value assertions are dead code.

@claude
Copy link

claude bot commented Mar 10, 2026

Commits
LGTM

Review
Large, well-structured PR adding multi-table streaming and tag-based discovery to the DynamoDB CDC input. Code follows project patterns (field name constants, error wrapping, lock hierarchy documentation, config validation). The single-table vs multi-table code paths are cleanly separated, and the new integration tests cover includelist mode, tag discovery, and table isolation.

  1. Dead value-isolation assertions in testIsolationBetweenTables — The test checks dataMap["value"] at the top level of the CDC message, but item data is nested under dynamodb.newImage. The assertions that verify each table returns its own value never execute, so data isolation is not actually validated. (code)

@squiidz squiidz force-pushed the dynamo_cdc_multi_table branch from dc7fe4c to 7f952b6 Compare March 10, 2026 15:55
@claude
Copy link

claude bot commented Mar 10, 2026

Commits
LGTM

Review
This PR extends the DynamoDB CDC input to support multi-table streaming with automatic tag-based discovery. The implementation cleanly separates single-table (legacy) and multi-table code paths, maintains proper lock hierarchy (documented in comments), and includes comprehensive test coverage (unit tests for tag filter parsing/matching, integration tests for includelist mode, table isolation, and tag discovery, plus throughput benchmarks).

LGTM

Extend the DynamoDB CDC input to stream from multiple tables
simultaneously, with automatic table discovery.

Changes:
- Replace single `table` field with `tables` list field
- Add `table_discovery_mode` with three modes:
  - `single` (default): stream from one table (backward compatible)
  - `includelist`: stream from an explicit list of tables
  - `tag`: auto-discover tables by DynamoDB tags using `table_tag_filter`
- Add `table_discovery_interval` for periodic rescanning (default 5m)
- Each table maintains independent shard tracking and checkpoint state
- Add multi-tag filter syntax: "key1:v1,v2;key2:v3,v4" for AND/OR
  tag matching
- Add integration tests for multi-table streaming and tag discovery
- Add end-to-end throughput benchmarks for CDC and snapshot modes

This enables use cases where users need to capture changes from many
DynamoDB tables (e.g. per-tenant tables) without deploying separate
connectors for each, and allows new tables to be picked up automatically
via tag-based discovery.
@squiidz squiidz force-pushed the dynamo_cdc_multi_table branch from 7f952b6 to f8c3454 Compare March 12, 2026 14:07
@claude
Copy link

claude bot commented Mar 12, 2026

Commits
LGTM

Review
Thorough review of the multi-table streaming and tag-based discovery implementation for aws_dynamodb_cdc. The PR adds a well-structured multi-table code path alongside the existing single-table path, with proper lock hierarchy documentation, double-check locking for concurrent table initialization, config validation to prevent unsupported combinations (e.g., snapshot + multi-table), and comprehensive tests (unit, integration, benchmarks). The shutdown logic is correctly handled across all modes via backgroundWorkers tracking and watcher goroutines.

LGTM

@squiidz squiidz merged commit 8eb74e9 into main Mar 12, 2026
7 checks passed
@squiidz squiidz deleted the dynamo_cdc_multi_table branch March 12, 2026 14:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants