This setup is going to demonstrate how to receive events from MySQL database and stream them down to a PostgreSQL database and/or an Elasticsearch server using the Debezium Event Flattening SMT.
+-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| PostgreSQL |
| |
+----------------+
We are using Docker Compose to deploy following components
How to run:
# Start the application
export DEBEZIUM_VERSION=2.1
docker compose -f docker-compose-jdbc.yaml up --build
# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
Check contents of the MySQL database:
docker compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | [email protected] |
| 1002 | George | Bailey | [email protected] |
| 1003 | Edward | Walker | [email protected] |
| 1004 | Anne | Kretchmar | [email protected] |
+------+------------+-----------+-----------------------+
Verify that the PostgreSQL database has the same content:
docker compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | [email protected]
Bailey | 1002 | George | [email protected]
Walker | 1003 | Edward | [email protected]
Kretchmar | 1004 | Anne | [email protected]
(4 rows)
Insert a new record into MySQL;
docker compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', '[email protected]');
Query OK, 1 row affected (0.02 sec)
Verify that PostgreSQL contains the new record:
docker compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
...
Doe | 1005 | John | [email protected]
(5 rows)
Update a record in MySQL:
mysql> update customers set first_name='Jane', last_name='Roe' where last_name='Doe';
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Verify that record in PostgreSQL is updated:
docker compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
...
Roe | 1005 | Jane | [email protected]
(5 rows)
Delete a record in MySQL:
mysql> delete from customers where email='[email protected]';
Query OK, 1 row affected (0.01 sec)
Verify that record in PostgreSQL is deleted:
docker compose-f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
...
(4 rows)
As you can see there is no longer a 'Jane Doe' as a customer.
End application:
# Shut down the cluster
docker compose -f docker-compose-jdbc.yaml down
+-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, ES connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| Elasticsearch |
| |
+----------------+
We are using Docker Compose to deploy the following components:
- MySQL
- Kafka
- ZooKeeper
- Kafka Broker
- Kafka Connect with Debezium and Elasticsearch Connectors
- Elasticsearch
How to run:
# Start the application
export DEBEZIUM_VERSION=2.1
docker compose -f docker-compose-es.yaml up --build
# Start Elasticsearch connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @es-sink.json
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
Check contents of the MySQL database:
docker compose -f docker-compose-es.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | [email protected] |
| 1002 | George | Bailey | [email protected] |
| 1003 | Edward | Walker | [email protected] |
| 1004 | Anne | Kretchmar | [email protected] |
+------+------------+-----------+-----------------------+
Verify that Elasticsearch has the same content:
curl 'http://localhost:9200/customers/_search?pretty'
{
"took" : 42,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 4,
"max_score" : 1.0,
"hits" : [
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "[email protected]"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar",
"email" : "[email protected]"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "[email protected]"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "[email protected]"
}
}
]
}
}
Insert a new record into MySQL:
docker compose -f docker-compose-es.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', '[email protected]');
Query OK, 1 row affected (0.02 sec)
Check that Elasticsearch contains the new record:
curl 'http://localhost:9200/customers/_search?pretty'
...
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "John",
"last_name" : "Doe",
"email" : "[email protected]"
}
}
...
Update a record in MySQL:
mysql> update customers set first_name='Jane', last_name='Roe' where last_name='Doe';
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Verify that record in Elasticsearch is updated:
curl 'http://localhost:9200/customers/_search?pretty'
...
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "Jane",
"last_name" : "Roe",
"email" : "[email protected]"
}
}
...
Delete a record in MySQL:
mysql> delete from customers where email='[email protected]';
Query OK, 1 row affected (0.01 sec)
Verify that record in Elasticsearch is deleted:
curl 'http://localhost:9200/customers/_search?pretty'
{
"took" : 42,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 4,
"max_score" : 1.0,
"hits" : [
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "[email protected]"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar",
"email" : "[email protected]"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "[email protected]"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "[email protected]"
}
}
]
}
}
As you can see there is no longer a 'Jane Doe' as a customer.
End the application:
# Shut down the cluster
docker compose -f docker-compose-es.yaml down
+-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC, ES connectors) |
| |
+---+-----------------------+------+
| |
| |
| |
| |
+-------------v--+ +---v---------------+
| | | |
| PostgreSQL | | ElasticSearch |
| | | |
+----------------+ +-------------------+
We are using Docker Compose to deploy the following components:
- MySQL
- Kafka
- ZooKeeper
- Kafka Broker
- Kafka Connect with Debezium, JDBC and Elasticsearch Connectors
- PostgreSQL
- Elasticsearch
How to run:
# Start the application
export DEBEZIUM_VERSION=2.1
docker compose up --build
# Start Elasticsearch connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @es-sink.json
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
Now you can execute commands as defined in the sections for JDBC and Elasticsearch sinks and you can verify that inserts and updates are present in both sinks.
End the application:
# Shut down the cluster
docker compose down