diff --git a/postgres-failover-slots/00_init.sql b/postgres-failover-slots/00_init.sql new file mode 100644 index 00000000..e6b7884b --- /dev/null +++ b/postgres-failover-slots/00_init.sql @@ -0,0 +1,29 @@ +BEGIN; + +create user replicator with replication encrypted password 'zufsob-kuvtum-bImxa6'; +SELECT pg_create_physical_replication_slot('replication_slot'); + +CREATE SCHEMA inventory; + +-- customers +CREATE TABLE inventory.customers ( + id SERIAL NOT NULL PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE, + is_test_account BOOLEAN NOT NULL +); + +ALTER SEQUENCE inventory.customers_id_seq RESTART WITH 1001; +ALTER TABLE inventory.customers REPLICA IDENTITY FULL; + +INSERT INTO inventory.customers +VALUES (default, 'Sally', 'Thomas', 'sally.thomas@acme.com', FALSE), + (default, 'George', 'Bailey', 'gbailey@foobar.com', FALSE), + (default, 'Edward', 'Walker', 'ed@walker.com', FALSE), + (default, 'Aidan', 'Barrett', 'aidan@example.com', TRUE), + (default, 'Anne', 'Kretchmar', 'annek@noanswer.org', TRUE), + (default, 'Melissa', 'Cole', 'melissa@example.com', FALSE), + (default, 'Rosalie', 'Stewart', 'rosalie@example.com', FALSE); + +COMMIT; diff --git a/postgres-failover-slots/README.md b/postgres-failover-slots/README.md new file mode 100644 index 00000000..0889affa --- /dev/null +++ b/postgres-failover-slots/README.md @@ -0,0 +1,99 @@ +# Failover Slots with Postgres 17 + +This example shows the usage of Postgres 17 failover replication slots with Debezium +(Debezium 3.0.4 or newer is required). + +The set-up contains a Postgres cluster with primary and read replica, fronted by pgbouncer. +Debezium connects to the cluster via this proxy. +When the primary goes down and the replica gets promoted to new primary, +connecting through the proxy allows Debezium to automatically fail over from previous primary to new without requiring to be reconfigured. + +Start all the components: + +```shell +export DEBEZIUM_VERSION=3.0 +docker compose up +``` + +Get a DB session on the Postgres read replica: + +```shell +docker run --tty --rm -i \ + --network failover-network \ + quay.io/debezium/tooling:1.2 \ + bash -c 'pgcli --prompt "\u@replica:\d> " postgresql://user:top-secret@postgres_replica:5432/inventorydb' +``` + +Add the database name to the connection info and reload the configuration of the read replica: + +```sql +ALTER SYSTEM SET primary_conninfo = 'user=replicator password=''zufsob-kuvtum-bImxa6'' channel_binding=prefer host=postgres_primary port=5432 sslmode=prefer sslnegotiation=postgres sslcompression=0 sslcertmode=allow sslsni=1 ssl_min_protocol_version=TLSv1.2 gssencmode=prefer krbsrvname=postgres gssdelegation=0 target_session_attrs=any load_balance_hosts=disable dbname=inventorydb'; + +SELECT pg_reload_conf(); +``` + +Register an instance of the Postgres connector. It is going to ingest changes from the current primary server (connecting through pgbouncer): + +```shell +# Start Postgres connector +http PUT http://localhost:8083/connectors/inventory-source/config < inventory-source.json + +# Consume messages from a Debezium topic +docker run --tty --rm \ + --network failover-network \ + quay.io/debezium/tooling:1.2 \ + kcat -b kafka:9092 -C -o beginning -q \ + -t dbserver1.inventory.customers | jq .payload + +# Modify a record in the database (current primary): +docker run --tty --rm -i \ + --network failover-network \ + quay.io/debezium/tooling:1.2 \ + bash -c 'pgcli --prompt "\u@primary:\d> " postgresql://user:top-secret@postgres_primary:5432/inventorydb' + +# update inventory.customers set first_name = 'Sarah' where id = 1001; +``` + +Next, fail over to the read replica, promoting it to new primary. +Stop the primary instance and pgbouncer: + +```shell +docker compose stop postgres_primary +docker compose stop pgbouncer +``` + +The connector will lose the connection and enter a restart loop. +Promote the read replica to new primary: + +```sql +select pg_promote(); +``` + +Reconfigure pgbouncer and restart it: + +```yaml +services: + pgbouncer: + image: edoburu/pgbouncer:latest + environment: + - DB_HOST=postgres_replica + ... +``` + +```shell +docker compose up -d +``` + +The connector will establish the connection again, +now connecting to the new primary. +Do one more change on that Postgres node (the previous replica) and observe how the change event shows up in Kafka. + +```sql +update inventory.customers set first_name = 'Rudy', last_name = 'Replica' where id = 1001; +``` + +Shut down the cluster: + +```shell +docker compose down +``` diff --git a/postgres-failover-slots/docker-compose.yaml b/postgres-failover-slots/docker-compose.yaml new file mode 100644 index 00000000..3e7fb13c --- /dev/null +++ b/postgres-failover-slots/docker-compose.yaml @@ -0,0 +1,109 @@ +services: + pgbouncer: + image: edoburu/pgbouncer:latest + environment: + - DB_HOST=postgres_primary + - DB_PORT=5432 + - DB_USER=user + - DB_PASSWORD=top-secret + - ADMIN_USERS=postgres,admin + - AUTH_TYPE=scram-sha-256 + ports: + - 15432:5432 + networks: + - my-network + + # Postgres cluster set-up inspired by https://github.com/eremeykin/pg-primary-replica + postgres_primary: + image: postgres:17-alpine + user: postgres + restart: always + healthcheck: + test: 'pg_isready -U user --dbname=inventorydb' + interval: 10s + timeout: 5s + retries: 5 + ports: + - 5432:5432 + environment: + POSTGRES_USER: user + POSTGRES_DB: inventorydb + POSTGRES_PASSWORD: top-secret + POSTGRES_HOST_AUTH_METHOD: "scram-sha-256\nhost replication all 0.0.0.0/0 md5" + POSTGRES_INITDB_ARGS: "--auth-host=scram-sha-256" + command: | + postgres + -c wal_level=logical + -c hot_standby=on + -c max_wal_senders=10 + -c max_replication_slots=10 + -c synchronized_standby_slots=replication_slot + volumes: + - ./00_init.sql:/docker-entrypoint-initdb.d/00_init.sql + networks: + - my-network + + postgres_replica: + image: postgres:17-alpine + user: postgres + restart: always + healthcheck: + test: 'pg_isready -U user --dbname=inventorydb' + interval: 10s + timeout: 5s + retries: 5 + ports: + - 5433:5432 + environment: + PGUSER: replicator + PGPASSWORD: zufsob-kuvtum-bImxa6 + command: | + bash -c " + until pg_basebackup --pgdata=/var/lib/postgresql/data -R --slot=replication_slot --host=postgres_primary --port=5432 + do + echo 'Waiting for primary to connect...' + sleep 1s + done + echo 'Backup done, starting replica...' + chmod 0700 /var/lib/postgresql/data + postgres -c wal_level=logical -c hot_standby=on -c hot_standby_feedback=1 -c sync_replication_slots=on + " + depends_on: + - postgres_primary + networks: + - my-network + + kafka: + image: quay.io/debezium/kafka:${DEBEZIUM_VERSION} + ports: + - 9092:9092 + - 9093:9093 + environment: + - CLUSTER_ID=oh-sxaDRTcyAr6pFRbXyzA + - BROKER_ID=1 + - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + networks: + - my-network + + connect: + image: quay.io/debezium/connect:${DEBEZIUM_VERSION} + ports: + - 8083:8083 + links: + - kafka + - pgbouncer + environment: + - BOOTSTRAP_SERVERS=kafka:9092 + - GROUP_ID=1 + - CONFIG_STORAGE_TOPIC=my_connect_configs + - OFFSET_STORAGE_TOPIC=my_connect_offsets + - STATUS_STORAGE_TOPIC=my_connect_statuses +# For testing newer connector versions, unpack the connector archive into this +# directory and uncomment the lines below +# volumes: +# - ./debezium-connector-postgres:/kafka/connect/debezium-connector-postgres + networks: + - my-network +networks: + my-network: + name: failover-network diff --git a/postgres-failover-slots/inventory-source.json b/postgres-failover-slots/inventory-source.json new file mode 100644 index 00000000..46751dcd --- /dev/null +++ b/postgres-failover-slots/inventory-source.json @@ -0,0 +1,13 @@ +{ + "connector.class" : "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max" : "1", + "database.hostname" : "pgbouncer", + "database.port" : "5432", + "database.user" : "user", + "database.password" : "top-secret", + "database.dbname" : "inventorydb", + "topic.prefix" : "dbserver1", + "schema.include.list" : "inventory", + "plugin.name" : "pgoutput", + "slot.failover" : "true" +}