Skip to content

Latest commit

 

History

History
102 lines (73 loc) · 3.61 KB

README.adoc

File metadata and controls

102 lines (73 loc) · 3.61 KB

Camel Debezium example

Introduction

An example which shows how to integrate Camel with Debezium and sink everything into a target database.

This project consists of the following examples:

  1. Send events using Debezium component to Kinesis.

  2. Load the data produced by Debezium into Cassandra.

Prerequisites

PostgreSQL

In order to stream changes from PostgreSQL, you may have to set up your PostgreSQL server. However, for the sake of this example, we will use the following docker image which is properly set up and contains some sample data:

$ docker run -it --rm --name pgsql -p 5432:5432 -e POSTGRES_DB=debezium-db -e POSTGRES_USER=pgsql-user -e POSTGRES_PASSWORD=pgsql-pw debezium/example-postgres:1.9

The above docker image will start a PostgreSQL server exposed to port 5432.

Amazon Kinesis

Since we will use Kinesis to stream changes from Debezium as an example, you need to create a stream called camel-debezium-example in eu-central-1. As well, you will need to create AWS access and secret keys, once you are done from creating the keys, update the following properties in src/main/resources/application.properties:

kinesis.accessKey = generated-access-key
kinesis.secretKey = generated-secret-key

Cassandra

In this example, we will use Cassandra to sink the events into, you will need to either to download and run Cassandra on your machine or you can simply use the following docker image that exposes a Cassandra instance on port 9042:

$ docker run -p 9042:9042 --rm --name cassandra -d cassandra

Once you have your Cassandra instance, using your favorite CQL gui or even cqlsh, you will need to execute the following CQL commands to prepare the necessary keyspace and table for our example:

CREATE KEYSPACE dbzSink WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};

USE dbzSink;

 CREATE TABLE products (
  id int PRIMARY KEY,
  name varchar,
  description varchar,
  weight float
);

Note: We will stream a table called product from PostgreSQL docker image which is already set. Most of the configurations that will get you started with this example are already set in application.properties.

Build

You will need to compile this example first:

$ mvn clean compile

Run

Run the Kinesis producer first

$ mvn exec:java -Pkinesis-producer

Run the Debezium consumer in the separate shell

$ mvn exec:java -Pdebezium-consumer

Initially, Debezium will perform a snapshot of the whitelisted tables per application.properties, hence you should expect the data to be replicated into Cassandra. Once the snapshot mode is done, you can try to insert a new row, update fields, delete etc. on PostgreSQL whitelisted table(s), you should see the changes reflecting on Cassandra as well, you can verify that by running the following query on cqlsh:

select * from dbzSink.products;

Configuration

You can configure the details in the file: src/main/resources/application.properties

You can enable verbose logging by adjusting the src/main/resources/log4j2.properties file as documented in the file.

Help and contributions

If you hit any problem using Camel or have some feedback, then please let us know.

We also love contributors, so get involved :-)

The Camel riders!