Data playground by the lake; meant to simulate end-to-end data pipelines.
-
Compose up source services:
docker compose -f docker-compose.source.yaml up
- Use the FastAI seeder at localhost:8000/docs and run /seed/full to generate records in the source Postgres
- [Optional] Connect to the Postgres
localhost:4444
with user:passwordpostgres:postgres
. You can find the generated records in the storefront db.
-
Compose up streaming services:
docker compose -f docker-compose.streaming.yaml up
- Register the Postgres connector with
docker compose -f docker-compose.streaming.yaml run --rm connect-postgres
. This will initially full load all records and keep streaming via CDC. - [Optional] Check AKHQ on localhost:8080 to view the created topics and schemas.
- Register the Postgres connector with
-
Compose up lake services:
docker compose -f docker-compose.lake.yaml up
- Check buckets (raw, lakehouse) at localhost:9001 with user:password
minio:minio123
. - Register the S3 connector with
docker compose -f docker-compose.streaming.yaml run --rm connect-s3
. This will initially full load all records and keep streaming the registered topics into theraw
bucket.
- Check buckets (raw, lakehouse) at localhost:9001 with user:password
# Source
docker compose -f docker-compose.source.yaml up
# Streaming
docker compose -f docker-compose.streaming.yaml up
# Lake
docker compose -f docker-compose.lake.yaml up
# Register connectors
docker compose -f docker-compose.streaming.yaml run --rm connect-postgres
docker compose -f docker-compose.streaming.yaml run --rm connect-s3
flowchart LR
%% Source zone
subgraph Source
direction TB
Seeder@{ shape: rounded, label: "FastAPI Seeder" }
Flyway@{ shape: hex, label: "Flyway" }
Postgres@{ shape: cyl, label: "PostgreSQL" }
Seeder -- "writes" --> Postgres
Flyway -- "runs migrations" --> Postgres
end
%% Streaming zone
subgraph Streaming
direction TB
Connect@{ shape: procs, label: "Kafka Connect" }
SchemaRegistry@{ shape: rounded, label: "Schema Registry (Avro)" }
Kafka@{ shape: rounded, label: "Kafka" }
AKHQ@{ shape: stadium, label: "AKHQ UI" }
ConnectInit@{ shape: sl-rect, label: "Connect Init" }
Connect -- "reads / writes schemas" --> SchemaRegistry
Connect -- "produces (source) / consumes (sink)" --> Kafka
AKHQ --> Kafka
ConnectInit -- "registers connectors" --> Connect
end
%% Lake zone
subgraph Lake
direction TB
subgraph RawBucket ["S3 landing zone"]
direction TB
Raw@{ shape: rect, label: "raw/" }
end
subgraph LakehouseBucket ["S3 lakehouse"]
direction TB
Stage@{ shape: rect, label: "lakehouse/stage/" }
Shape@{ shape: rect, label: "lakehouse/shape/" }
Serve@{ shape: rect, label: "lakehouse/serve/" }
end
end
%% Cross-zone
Connect -- "Postgres Source: polls CDC via Debezium" --> Postgres
Connect -- "S3 Sink: writes raw Avro" --> Raw
Postgres is used as OLTP source. The default storefront
database is meant to simulate a web shop.
- Connect via
localhost:4444
| user:password:postgres:postgres
- You can create a new database with, e.g.:
docker compose -f docker-compose.source.yaml exec postgres psql -U postgres -d postgres -c "CREATE DATABASE storefront;"
- If meant to be persisted, add the new database to
postgres-init/init.sql
.
Flyway is used as migration tool.
- Run migrations on the default
storefront
database via:docker compose -f docker-compose.source.yaml up flyway
Use the endpoints on localhost:8000/docs to run a initial fill or start/stop continuous, simulated traffic.
Kafka is used for streaming CDC events from Postgres via Debezium to the S3 ingest sink in the lake. You can use the AKHQ UI at localhost:8080 to monitor/manage it.
All connectors are defined in connectors/*.template.json
. envsubst is used to substitute .env
variables.
-
Register connector:
# Postgres Source docker compose -f docker-compose.streaming.yaml up connect-postgres # S3 Sink docker compose -f docker-compose.streaming.yaml up connect-s3 # .. per curl curl -X POST -H "Content-Type: application/json" --data "@/connectors/postgres-source.json" http://connect:8083/connectors;
-
List registered connectors:
curl http://localhost:8083/connectors
-
Check connector status:
curl http://localhost:8083/connectors/postgres-source-storefront/status
-
Delete connector:
curl -X DELETE http://localhost:8083/connectors/postgres-source-storefront
To add a new connector, create a template in connectors/*.template.json
and either register it manually (see command above) or add another service to docker-compose.streaming.yaml
.
Example of resetting a sink connector:
# Delete connector
curl -X DELETE http://localhost:8083/connectors/s3-sink-storefront
# Reset offsets
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group connect-s3-sink-storefront \
--reset-offsets \
--all-topics \
--to-earliest \
--execute
# Add connector
docker compose -f docker-compose.streaming.yaml run --rm connect-s3
Currently, CDC for Postgres is handled by Debezium. This includes:
- the addition of the connector plugin to the
Dockerfile.connect
image - the creation of the Debezium user with the appropriate roles in Postgres
migrations/V2__create_debezium_user.sql
- the proper connector config
connectors/debezium-postgres.template.json
- Postgres' write-ahead log set to allow logical replication, see
wal_level=logical
incompose/postgresql.custom.conf
Strategy, example with Postgres CDC:
- Default setting
delete.retention.ms
is86400000
(1 day). That is, either downstream is polling all messages or the connector has to be reset to push an initial snapshot to Kafka. To reset the Debezium connector, delete it and re-run the connect-init. - Default setting
cleanup.policy
isdelete
. However,compact,delete
can make sense for a lakehouse downstream.
MinIO is used as AWS S3 equivalent object store. Zones (buckets):
raw
: raw data, e.g., Avro messageslakehouse
: structured formats, e.g. Iceberg tables, with semantic sub-zonesstage
,shape
,serve
-- following a classic Medallion architecture.
You can explore the lake buckets using the MinIO UI. Check localhost:9001 to monitor/manage buckets; user:password: minio:minio123
.
The buckets are created and initialized via the MinIO client mc
during docker-compose.lake.yaml
startup, defined in compose/entrypoint.miniomc.sh
.
Jupyter notebooks can be found in scripts/
. Make sure to add the requirements in scripts/requirements.txt
.
Pre-commit is used for linting. Run pre-commit install
once to initialize it for this repo.