Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 108 additions & 19 deletions docs/modules/components/pages/inputs/aws_dynamodb_cdc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Common::
input:
label: ""
aws_dynamodb_cdc:
table: "" # No default (required)
tables: []
checkpoint_table: redpanda_dynamodb_checkpoints
start_from: trim_horizon
snapshot_mode: none
Expand All @@ -55,7 +55,10 @@ Advanced::
input:
label: ""
aws_dynamodb_cdc:
table: "" # No default (required)
tables: []
table_discovery_mode: single
table_tag_filter: ""
table_discovery_interval: 5m
checkpoint_table: redpanda_dynamodb_checkpoints
batch_size: 1000
poll_interval: 1s
Expand All @@ -67,7 +70,6 @@ input:
snapshot_segments: 1
snapshot_batch_size: 100
snapshot_throttle: 100ms
snapshot_max_backoff: 0s
snapshot_deduplicate: true
snapshot_buffer_size: 100000
region: "" # No default (optional)
Expand Down Expand Up @@ -100,10 +102,21 @@ DynamoDB Streams capture item-level changes in DynamoDB tables. This input suppo
- Checkpoint-based resumption after restarts
- Concurrent processing of multiple shards
- Optional initial snapshot of existing table data
- Multi-table streaming with auto-discovery by tags or explicit table lists

### Table Discovery Modes

This input supports three table discovery modes:

- `single` (default) - Stream from a single table specified in the `tables` field
- `tag` - Auto-discover and stream from multiple tables based on DynamoDB table tags. Use `table_tag_filter` to filter tables (e.g. `key:value`)
- `includelist` - Stream from an explicit list of tables specified in the `tables` field

When using `tag` or `includelist` mode, the connector will stream from all matching tables simultaneously. Each table maintains its own checkpoint state. Use `table_discovery_interval` to periodically rescan for new tables (useful for dynamically tagged tables).

### Prerequisites

The source DynamoDB table must have streams enabled. You can enable streams with one of these view types:
The source DynamoDB table(s) must have streams enabled. You can enable streams with one of these view types:

- `KEYS_ONLY` - Only the key attributes of the modified item
- `NEW_IMAGE` - The entire item as it appears after the modification
Expand Down Expand Up @@ -166,7 +179,7 @@ Read change events from a DynamoDB table with streams enabled.
```yaml
input:
aws_dynamodb_cdc:
table: my-table
tables: [my-table]
region: us-east-1
```

Expand All @@ -180,7 +193,7 @@ Only process new changes, ignoring existing stream data.
```yaml
input:
aws_dynamodb_cdc:
table: orders
tables: [orders]
start_from: latest
region: us-west-2
```
Expand All @@ -195,24 +208,109 @@ Scan all existing records, then stream ongoing changes.
```yaml
input:
aws_dynamodb_cdc:
table: products
tables: [products]
snapshot_mode: snapshot_and_cdc
snapshot_segments: 5
region: us-east-1
```

--
Auto-discover tables by tag::
+
--

Automatically discover and stream from all tables with a specific tag.

```yaml
input:
aws_dynamodb_cdc:
table_discovery_mode: tag
table_tag_filter: "stream-enabled:true"
table_discovery_interval: 5m
region: us-east-1
```

--
Auto-discover tables by multiple tags::
+
--

Discover tables matching multiple tag criteria with OR logic per key, AND logic across keys.

```yaml
input:
aws_dynamodb_cdc:
table_discovery_mode: tag
table_tag_filter: "environment:prod,staging;team:data,analytics"
table_discovery_interval: 5m
region: us-east-1
# Matches tables with: (environment=prod OR environment=staging) AND (team=data OR team=analytics)
```

--
Stream from multiple specific tables::
+
--

Stream from an explicit list of tables simultaneously.

```yaml
input:
aws_dynamodb_cdc:
table_discovery_mode: includelist
tables:
- orders
- customers
- products
region: us-west-2
```

--
======

== Fields

=== `table`
=== `tables`

List of table names to stream from. For single table mode, provide one table. For multi-table mode, provide multiple tables.


*Type*: `array`

*Default*: `[]`

=== `table_discovery_mode`

Table discovery mode. `single`: stream from tables specified in `tables` list. `tag`: auto-discover tables by tags (ignores `tables` field). `includelist`: stream from tables in `tables` list (alias for `single`, kept for compatibility).


*Type*: `string`

*Default*: `"single"`

Options:
`single`
, `tag`
, `includelist`
.

=== `table_tag_filter`

Multi-tag filter: 'key1:v1,v2;key2:v3,v4'. Matches tables with (key1=v1 OR key1=v2) AND (key2=v3 OR key2=v4). Required when `table_discovery_mode` is `tag`.


*Type*: `string`

*Default*: `""`

The name of the DynamoDB table to read streams from.
=== `table_discovery_interval`

Interval for rescanning and discovering new tables when using `tag` or `includelist` mode. Set to 0 to disable periodic rescanning.


*Type*: `string`

*Default*: `"5m"`

=== `checkpoint_table`

Expand Down Expand Up @@ -299,7 +397,7 @@ Options:

=== `snapshot_segments`

Number of parallel DynamoDB Scan segments. Each segment scans a portion of the table concurrently, increasing throughput at the cost of more provisioned read capacity. Higher values consume more RCUs. Experiment to find the optimal value for your table.
Number of parallel scan segments (1-10). Higher parallelism scans faster but consumes more RCUs. Start with 1 for safety.


*Type*: `int`
Expand All @@ -324,15 +422,6 @@ Minimum time between scan requests per segment. Use this to limit RCU consumptio

*Default*: `"100ms"`

=== `snapshot_max_backoff`

Maximum total time to retry throttled snapshot scan requests before giving up. Set to 0 for unlimited retries.


*Type*: `string`

*Default*: `"0s"`

=== `snapshot_deduplicate`

Deduplicate records that appear in both snapshot and CDC stream. Requires buffering CDC events during snapshot. If buffer is exceeded, deduplication is disabled to prevent data loss.
Expand Down
Loading
Loading