A Data Lakehouse Proof-of-Concept for streaming CDC events from a database into cloud storage using Delta Lake
flowchart LR
A[(postgres)] --> |Debezium| B[Kafka Connect]
B --> C{Kafka}
C -->D("Delta Lake (Bronze)")
subgraph "Spark Structured Streaming"
D -->E("Delta Lake (Silver)")
Please make sure you have a Docker runtime installed and running.
make scala-build
This will run
sbt assembly
and create your app jar inside the docker container -
make spark-build
Builds spark image with app jar from previous step.
make up
Starts all services found in
in daemon mode. NOTE:connect
service takes some time to start up (approx. 2-5 mins), so follow logs until Connect is ready to handle requests. -
make start-postgress-connector
Creates Debezium postgres connector in Kafka Connect, and starts an initial snapshot of postgres tables.
make consume-messages
Starts the avro-console-consumer script from Schema Registry service, you can use this to verify messages are in Kafka and can be consumed. Use
to exit.
To get started producing Delta Lake tables, run the following. You will need to wait for the Bronze Table to be created prior to running the Silver table process.
make bronze
(in new terminal)Starts the Kafka -> Delta Table (Bronze layer) process
make silver
(in new terminal)Starts the Delta Table (Bronze) -> Delta Table (Silver)
Run make login-postgres
to login to the db (or use a db client of your choice), and make changes to the customers
table. Provided below is a starting point:
update customers set first_name = 'Jerry', last_name = 'Garcia' where id = 1002;
update customers set first_name = 'Jerry', last_name = 'Garcia' where id = 1003;
update customers set first_name = 'Jerry', last_name = 'Garcia' where id = 1004;
delete from customers where id = 1004;
alter table customers add column test_add_column varchar default null;
insert into customers (id, first_name, last_name, email, test_add_column) values (1005, 'Prince', 'Nelson', '[email protected]', 'test_add');
Visit http://localhost:9001 to access the MinIO console, using minioadmin/minioadmin
un/pw to log in and view the Delta Lake Objects.
Run make spark-shell
in another terminal window to start a spark shell, and use the following code to see create the bronze and silver DataFrames.
// set delta locations
val deltaBronzePath = "s3a://warehouse/bronze/inventory/customers"
val deltaSilverPath = "s3a://warehouse/silver/inventory/customers"
// create dataframes
val bronzeDf = spark.read.format("delta").load(deltaBronzePath)
val silverDf = spark.read.format("delta").load(deltaSilverPath)
// display dataframes
Using Spark SQL below, we can select versions of the Delta Lake tables
// describe history of delta table
spark.sql(s"DESCRIBE HISTORY delta.`${deltaSilverPath}`").show()
// show delta table from a specific version
spark.sql(s"select * from delta.`${deltaSilverPath}` VERSION AS OF 1").show(false)
// show delta table from a specific timestamp
spark.sql(s"select * from delta.`${deltaSilverPath}` TIMESTAMP AS OF 'yyyy-MM-dd HH:mm'").show(false)