Skip to content

Commit 5000786

Browse files
committed
Kafka: Expand default_msg_processor into a miniature decoding unit
- Accept a bunch of decoding options per `KafkaDecodingOptions` - Provide a bunch of output formatting options per `KafkaEvent` - Tie both elements together using `KafkaEventProcessor` The machinery is effectively the same like before, but provides a few more options to allow type decoding for Kafka event's key/value slots, a selection mechanism to limit the output to specific fields only, and a small projection mechanism to optionally drill down into a specific field. In combination, those decoding options allow users to relay JSON-encoded Kafka event values directly into a destination table, without any metadata wrappings. The output formatter provides three different variants out of the box. More variants can be added in the future, as other users or use cases may have different requirements in the same area. Most importantly, the decoding unit is very compact, so relevant tasks don't need a corresponding transformation unit down the pipeline, to keep the whole ensemble lean, in the very spirit of ingestr.
1 parent c90edf2 commit 5000786

File tree

9 files changed

+460
-58
lines changed

9 files changed

+460
-58
lines changed

docs/supported-sources/kafka.md

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,80 @@ The URI format for Apache Kafka is as follows:
1010
kafka://?bootstrap_servers=localhost:9092&group_id=test_group&security_protocol=SASL_SSL&sasl_mechanisms=PLAIN&sasl_username=example_username&sasl_password=example_secret&batch_size=1000&batch_timeout=3
1111
```
1212

13-
URI parameters:
13+
### URI parameters
14+
15+
Connectivity options:
1416
- `bootstrap_servers`: Required, the Kafka server or servers to connect to, typically in the form of a host and port, e.g. `localhost:9092`
1517
- `group_id`: Required, the consumer group ID used for identifying the client when consuming messages.
1618
- `security_protocol`: The protocol used to communicate with brokers, e.g. `SASL_SSL` for secure communication.
1719
- `sasl_mechanisms`: The SASL mechanism to be used for authentication, e.g. `PLAIN`.
1820
- `sasl_username`: The username for SASL authentication.
1921
- `sasl_password`: The password for SASL authentication.
22+
23+
Transfer options:
2024
- `batch_size`: The number of messages to fetch in a single batch, defaults to 3000.
2125
- `batch_timeout`: The maximum time to wait for messages, defaults to 3 seconds.
2226

27+
Decoding options:
28+
- `key_type`: The data type of the Kafka event `key` field. Possible values: `json`.
29+
- `value_type`: The data type of the Kafka event `value_type` field. Possible values: `json`.
30+
- `format`: The output format/layout. Possible values: `standard_v1`, `standard_v2`, `flexible`.
31+
- `include`: Which fields to include in the output, comma-separated.
32+
- `select`: Which field to select (pick) into the output.
33+
2334
The URI is used to connect to the Kafka brokers for ingesting messages.
35+
When using the `include` or `select` option, the decoder will automatically
36+
select the `flexible` output format.
2437

2538
### Group ID
2639
The group ID is used to identify the consumer group that reads messages from a topic. Kafka uses the group ID to manage consumer offsets and assign partitions to consumers, which means that the group ID is the key to reading messages from the correct partition and position in the topic.
2740

28-
Once you have your Kafka server, credentials, and group ID set up, here's a sample command to ingest messages from a Kafka topic into a DuckDB database:
41+
## Examples
42+
43+
### Kafka to DuckDB
44+
45+
Once you have your Kafka server, credentials, and group ID set up,
46+
here are a few sample commands to ingest messages from a Kafka topic into a destination database:
2947

48+
Transfer data using the traditional `standard_v1` output format into DuckDB.
49+
The result of this command will be a table in the `kafka.duckdb` database with JSON columns.
3050
```sh
3151
ingestr ingest \
32-
--source-uri 'kafka://?bootstrap_servers=localhost:9092&group_id=test_group' \
52+
--source-uri 'kafka://?bootstrap_servers=localhost:9092&group_id=test' \
3353
--source-table 'my-topic' \
34-
--dest-uri duckdb:///kafka.duckdb \
54+
--dest-uri 'duckdb:///kafka.duckdb' \
3555
--dest-table 'dest.my_topic'
3656
```
3757

38-
The result of this command will be a table in the `kafka.duckdb` database with JSON columns.
58+
### Kafka to PostgreSQL
59+
60+
Transfer data converging the Kafka event `value` into a PostgreSQL destination
61+
table, after decoding from JSON, using the `flexible` output format.
62+
```sh
63+
echo '{"sensor_id":1,"ts":"2025-06-01 10:00","reading":42.42}' | kcat -P -b localhost -t demo
64+
```
65+
```sh
66+
ingestr ingest \
67+
--source-uri 'kafka://?bootstrap_servers=localhost:9092&group_id=test&value_type=json&select=value' \
68+
--source-table 'demo' \
69+
--dest-uri 'postgres://postgres:postgres@localhost:5432/?sslmode=disable' \
70+
--dest-table 'public.kafka_demo'
71+
```
72+
The result of this command will be the `public.kafka_demo` table using
73+
the Kafka event `value`'s top-level JSON keys as table columns.
74+
```sh
75+
psql "postgresql://postgres:postgres@localhost:5432/" \
76+
-c '\d+ public.kafka_demo' \
77+
-c 'select * from public.kafka_demo;'
78+
```
79+
```text
80+
Table "public.kafka_demo"
81+
82+
Column | Type |
83+
--------------+--------------------------+
84+
sensor_id | bigint |
85+
ts | timestamp with time zone |
86+
reading | double precision |
87+
_dlt_load_id | character varying |
88+
_dlt_id | character varying |
89+
```

ingestr/main_test.py

Lines changed: 141 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import requests
3131
import sqlalchemy
3232
from confluent_kafka import Producer # type: ignore
33+
from confluent_kafka.admin import AdminClient # type: ignore
3334
from dlt.sources.filesystem import glob_files
3435
from fsspec.implementations.memory import MemoryFileSystem # type: ignore
3536
from sqlalchemy.pool import NullPool
@@ -1284,19 +1285,39 @@ def as_datetime2(date_str: str) -> datetime:
12841285
return datetime.strptime(date_str, "%Y-%m-%d")
12851286

12861287

1288+
@pytest.fixture(scope="session")
1289+
def kafka_service():
1290+
"""
1291+
Provide a Kafka service container for the whole test session.
1292+
"""
1293+
container = KafkaContainer("confluentinc/cp-kafka:7.6.0")
1294+
container.start()
1295+
yield container
1296+
container.stop()
1297+
1298+
1299+
@pytest.fixture(scope="function")
1300+
def kafka(kafka_service):
1301+
"""
1302+
Provide a Kafka service container using a clean canvas.
1303+
Before invoking the test case, delete all relevant topics completely.
1304+
"""
1305+
admin = AdminClient({"bootstrap.servers": kafka_service.get_bootstrap_server()})
1306+
admin.delete_topics(["test_topic"])
1307+
admin.poll(1)
1308+
return kafka_service
1309+
1310+
12871311
@pytest.mark.parametrize(
12881312
"dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys())
12891313
)
1290-
def test_kafka_to_db(dest):
1314+
def test_kafka_to_db_incremental(kafka, dest):
1315+
"""
1316+
Validate standard Kafka event decoding, focusing on both metadata and data payload.
1317+
"""
12911318
with ThreadPoolExecutor() as executor:
12921319
dest_future = executor.submit(dest.start)
1293-
source_future = executor.submit(
1294-
KafkaContainer("confluentinc/cp-kafka:7.6.0").start, timeout=120
1295-
)
12961320
dest_uri = dest_future.result()
1297-
kafka = source_future.result()
1298-
1299-
# kafka = KafkaContainer("confluentinc/cp-kafka:7.6.0").start(timeout=60)
13001321

13011322
# Create Kafka producer
13021323
producer = Producer({"bootstrap.servers": kafka.get_bootstrap_server()})
@@ -1357,7 +1378,119 @@ def get_output_table():
13571378
assert res[2] == ("message3",)
13581379
assert res[3] == ("message4",)
13591380

1360-
kafka.stop()
1381+
1382+
@pytest.mark.parametrize(
1383+
"dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys())
1384+
)
1385+
def test_kafka_to_db_decode_json(kafka, dest):
1386+
"""
1387+
Validate slightly more advanced Kafka event decoding, focusing on the payload value this time.
1388+
1389+
This exercise uses the `value_type=json` and `select=value` URL parameters.
1390+
"""
1391+
with ThreadPoolExecutor() as executor:
1392+
dest_future = executor.submit(dest.start)
1393+
dest_uri = dest_future.result()
1394+
1395+
# Create Kafka producer
1396+
producer = Producer({"bootstrap.servers": kafka.get_bootstrap_server()})
1397+
1398+
# Create topic and send messages
1399+
topic = "test_topic"
1400+
messages = [
1401+
{"id": 1, "temperature": 42.42, "humidity": 82},
1402+
{"id": 2, "temperature": 451.00, "humidity": 15},
1403+
]
1404+
1405+
for message in messages:
1406+
producer.produce(topic, json.dumps(message))
1407+
producer.flush()
1408+
1409+
def run():
1410+
res = invoke_ingest_command(
1411+
f"kafka://?bootstrap_servers={kafka.get_bootstrap_server()}&group_id=test_group&value_type=json&select=value",
1412+
"test_topic",
1413+
dest_uri,
1414+
"testschema.output",
1415+
)
1416+
assert res.exit_code == 0
1417+
1418+
def get_output_table():
1419+
dest_engine = sqlalchemy.create_engine(dest_uri)
1420+
with dest_engine.connect() as conn:
1421+
res = (
1422+
conn.execute(
1423+
"SELECT id, temperature, humidity FROM testschema.output WHERE temperature >= 38.00 ORDER BY id ASC"
1424+
)
1425+
.mappings()
1426+
.fetchall()
1427+
)
1428+
dest_engine.dispose()
1429+
return res
1430+
1431+
run()
1432+
1433+
res = get_output_table()
1434+
assert len(res) == 2
1435+
assert res[0] == messages[0]
1436+
assert res[1] == messages[1]
1437+
1438+
1439+
@pytest.mark.parametrize(
1440+
"dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys())
1441+
)
1442+
def test_kafka_to_db_include_metadata(kafka, dest):
1443+
"""
1444+
Validate slightly more advanced Kafka event decoding, focusing on metadata this time.
1445+
1446+
This exercise uses the `include=` URL parameter.
1447+
"""
1448+
with ThreadPoolExecutor() as executor:
1449+
dest_future = executor.submit(dest.start)
1450+
dest_uri = dest_future.result()
1451+
1452+
# Create Kafka producer
1453+
producer = Producer({"bootstrap.servers": kafka.get_bootstrap_server()})
1454+
1455+
# Create topic and send messages
1456+
topic = "test_topic"
1457+
messages = [
1458+
{"id": 1, "temperature": 42.42, "humidity": 82},
1459+
{"id": 2, "temperature": 451.00, "humidity": 15},
1460+
]
1461+
1462+
for message in messages:
1463+
producer.produce(topic=topic, value=json.dumps(message), key="test")
1464+
producer.flush()
1465+
1466+
def run():
1467+
res = invoke_ingest_command(
1468+
f"kafka://?bootstrap_servers={kafka.get_bootstrap_server()}&group_id=test_group&include=partition,topic,key,offset,ts",
1469+
"test_topic",
1470+
dest_uri,
1471+
"testschema.output",
1472+
)
1473+
assert res.exit_code == 0
1474+
1475+
def get_output_table():
1476+
dest_engine = sqlalchemy.create_engine(dest_uri)
1477+
with dest_engine.connect() as conn:
1478+
res = (
1479+
conn.execute(
1480+
'SELECT "partition", "topic", "key", "offset" FROM testschema.output ORDER BY "ts__value" ASC'
1481+
)
1482+
.mappings()
1483+
.fetchall()
1484+
)
1485+
dest_engine.dispose()
1486+
return res
1487+
1488+
run()
1489+
1490+
res = get_output_table()
1491+
assert len(res) == 2
1492+
assert res[0] == {"partition": 0, "topic": "test_topic", "key": "test", "offset": 0}
1493+
assert res[1] == {"partition": 0, "topic": "test_topic", "key": "test", "offset": 1}
13611494

13621495

13631496
@pytest.mark.parametrize(

ingestr/src/kafka/__init__.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""A source to extract Kafka messages.
22
3-
When extraction starts, partitions length is checked -
3+
When extraction starts, partition length is checked -
44
data is read only up to it, overriding the default Kafka's
55
behavior of waiting for new messages in endless loop.
66
"""
@@ -16,8 +16,8 @@
1616

1717
from .helpers import (
1818
KafkaCredentials,
19+
KafkaEventProcessor,
1920
OffsetTracker,
20-
default_msg_processor,
2121
)
2222

2323

@@ -29,9 +29,7 @@
2929
def kafka_consumer(
3030
topics: Union[str, List[str]],
3131
credentials: Union[KafkaCredentials, Consumer] = dlt.secrets.value,
32-
msg_processor: Optional[
33-
Callable[[Message], Dict[str, Any]]
34-
] = default_msg_processor,
32+
msg_processor: Optional[Callable[[Message], Dict[str, Any]]] = None,
3533
batch_size: Optional[int] = 3000,
3634
batch_timeout: Optional[int] = 3,
3735
start_from: Optional[TAnyDateTime] = None,
@@ -49,17 +47,19 @@ def kafka_consumer(
4947
Auth credentials or an initiated Kafka consumer. By default,
5048
is taken from secrets.
5149
msg_processor(Optional[Callable]): A function-converter,
52-
which'll process every Kafka message after it's read and
53-
before it's transfered to the destination.
50+
which will process every Kafka message after it is read and
51+
before it is transferred to the destination.
5452
batch_size (Optional[int]): Messages batch size to read at once.
5553
batch_timeout (Optional[int]): Maximum time to wait for a batch
56-
consume, in seconds.
54+
to be consumed in seconds.
5755
start_from (Optional[TAnyDateTime]): A timestamp, at which to start
5856
reading. Older messages are ignored.
5957
6058
Yields:
6159
Iterable[TDataItem]: Kafka messages.
6260
"""
61+
msg_processor = msg_processor or KafkaEventProcessor().process
62+
6363
if not isinstance(topics, list):
6464
topics = [topics]
6565

0 commit comments

Comments
 (0)