Skip to content

Commit 729e70f

Browse files
daamiendamien
andauthored
Add an example of a WAL decoder (#1845)
This example shows how to build a basic Change Data Capture (CDC) mechanism using the Postgres Logical Decoding capabilities. The changes occurring on the database are serialized into JSON and pushed to a queue (a "logical replication slot") where they can be consumed by remote clients. A Postgres CDC extension can be a used in various purposes: * Ad hoc replication systems (e.g. Postgres => SQL Server ) * External commit-log for a distributed system (such as Kafka) * Advanced Monitoring ( e.g. Prometheus/Loki ) This example tries to find the right tradeoff between simplicity and usefulness. Currently it has strong limitations but they should be easy to overcome. Rust really shines in this example compared to similar implementations in C (see the wal2json extension). The Serde crate provides JSON serialization out of the box, whereas all the C implementations are forced to write their own JSON formatter. Co-authored-by: damien <[email protected]>
1 parent b810e98 commit 729e70f

File tree

7 files changed

+490
-1
lines changed

7 files changed

+490
-1
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#LICENSE All rights reserved.
88
#LICENSE
99
#LICENSE Use of this source code is governed by the MIT license that can be found in the LICENSE file.
10-
10+
1111
[workspace]
1212
resolver = "2"
1313
members = [
@@ -32,6 +32,7 @@ exclude = [
3232
"pgrx-examples/custom_types",
3333
"pgrx-examples/custom_sql",
3434
"pgrx-examples/datetime",
35+
"pgrx-examples/wal_decoder",
3536
"pgrx-examples/errors",
3637
"pgrx-examples/nostd",
3738
"pgrx-examples/numeric",

pgrx-examples/wal_decoder/.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
.DS_Store
2+
.idea/
3+
/target
4+
*.iml
5+
**/*.rs.bk
6+
Cargo.lock

pgrx-examples/wal_decoder/Cargo.toml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
[package]
2+
name = "wal_decoder"
3+
version = "0.0.0"
4+
edition = "2021"
5+
6+
[lib]
7+
crate-type = ["cdylib", "lib"]
8+
9+
[[bin]]
10+
name = "pgrx_embed_wal_decoder"
11+
path = "./src/bin/pgrx_embed.rs"
12+
13+
[features]
14+
default = ["pg13"]
15+
pg12 = ["pgrx/pg12", "pgrx-tests/pg12" ]
16+
pg13 = ["pgrx/pg13", "pgrx-tests/pg13" ]
17+
pg14 = ["pgrx/pg14", "pgrx-tests/pg14" ]
18+
pg15 = ["pgrx/pg15", "pgrx-tests/pg15" ]
19+
pg16 = ["pgrx/pg16", "pgrx-tests/pg16" ]
20+
pg17 = ["pgrx/pg17", "pgrx-tests/pg17" ]
21+
pg_test = []
22+
23+
[dependencies]
24+
pgrx = { path = "../../pgrx", default-features = false }
25+
serde = "1.0.209"
26+
serde_json = "1.0.128"
27+
28+
[dev-dependencies]
29+
pgrx-tests = { path = "../../pgrx-tests" }
30+
31+
[profile.dev]
32+
panic = "unwind"
33+
34+
[profile.release]
35+
panic = "unwind"
36+
opt-level = 3
37+
lto = "fat"
38+
codegen-units = 1

pgrx-examples/wal_decoder/README.md

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
2+
# A simple Change Data Capture ([CDC]) extension.
3+
4+
This extension will extract the [DML] changes from Postgres WAL
5+
using [Logical Decoding] and export them in JSON format using [serde].
6+
7+
## Principles
8+
9+
* Postgres triggers various callbacks at the different stages of a transaction
10+
* The decoder defines some of these callbacks: begin, change, commit, etc.
11+
* The callbacks extract the changes made during the transaction
12+
* They build Rust structs (Action, Tuple) to represent those changes
13+
* The structs are then serialized into JSON
14+
* The JSON output is sent into a logical replication slot (i.e. a queue)
15+
* The output can be consumed in various ways by a remote client
16+
17+
## Requirements
18+
19+
In order to use this extension with a cargo-pgrx managed instance, you'll
20+
need to add the configuration below in "$PGRX_HOME/data-$PGVER/postgresql.conf".
21+
22+
``` ini
23+
shared_preload_libraries = 'wal_decoder'
24+
wal_level = logical
25+
```
26+
27+
## Example
28+
29+
1- Create a table and publish it
30+
31+
``` sql
32+
CREATE TABLE person (name TEXT, age INT);
33+
ALTER TABLE person REPLICA IDENTITY FULL;
34+
CREATE PUBLICATION gotham_pub FOR TABLE person;
35+
```
36+
37+
2- Create a replication slot fed by the decoder
38+
39+
``` sql
40+
SELECT pg_create_logical_replication_slot('gotham_slot', 'wal_decoder');
41+
```
42+
43+
3- Consume the changes from the replication slot
44+
45+
``` sql
46+
INSERT INTO person
47+
VALUES ('Bruce Wayne',42),('Clark Kent',33);
48+
```
49+
50+
``` sql
51+
SELECT * FROM pg_logical_slot_get_changes('gotham_slot', NULL, NULL);
52+
53+
lsn | xid | data
54+
-----------+-----+------------------------------------------------------------------------------
55+
0/16A87C8 | 581 | {"typ":"BEGIN"}
56+
0/16A87C8 | 581 | {"typ":"INSERT","rel":"public.person","new":{"name":"Bruce Wayne","age":42}}
57+
0/16A8810 | 581 | {"typ":"INSERT","rel":"public.person","new":{"name":"Clark Kent","age":33}}
58+
0/16A8888 | 581 | {"typ":"COMMIT","committed":779145498360779,"change_count":2}
59+
```
60+
61+
``` sql
62+
UPDATE person SET name = 'Batman' WHERE name= 'Bruce Wayne';
63+
```
64+
65+
``` sql
66+
SELECT xid, jsonb_pretty(data::JSONB)
67+
FROM pg_logical_slot_get_changes('gotham_slot', NULL, NULL);
68+
69+
xid | jsonb_pretty
70+
-----+-----------------------------------
71+
587 | { +
72+
| "typ": "BEGIN" +
73+
| }
74+
587 | { +
75+
| "new": { +
76+
| "age": 42, +
77+
| "name": "Batman" +
78+
| }, +
79+
| "old": { +
80+
| "age": 42, +
81+
| "name": "Bruce Wayne" +
82+
| }, +
83+
| "rel": "public.person", +
84+
| "typ": "UPDATE" +
85+
| }
86+
587 | { +
87+
| "typ": "COMMIT", +
88+
| "committed": 779179731927669,+
89+
| "change_count": 1 +
90+
| }
91+
```
92+
93+
## Limitations
94+
95+
This decoder is designed as a basic example and it has the following limitations:
96+
97+
* Only the REPLICA IDENTITY FULL mode is fully supported. Supporting REPLICA IDENTITY DEFAULT
98+
would require additional work.
99+
100+
* Only TEXT and INT values are serialized. Supporting other types should be trivial.
101+
102+
103+
## Other WAL decoders
104+
105+
Here are some other implementations in C that can be useful:
106+
107+
* <https://github.com/dalibo/hackingpg/blob/main/journee5/audit/plugin_audit.c>
108+
* <https://github.com/leptonix/decoding-json/blob/master/decoding_json.c>
109+
* <https://github.com/michaelpq/pg_plugins/blob/main/decoder_raw/decoder_raw.c>
110+
* <https://github.com/eulerto/wal2json/blob/master/wal2json.c>
111+
112+
<!-- Links -->
113+
114+
[CDC]: https://en.wikipedia.org/wiki/Change_data_capture
115+
[DML]: https://en.wikipedia.org/wiki/Data_manipulation_language
116+
[Logical Decoding]: https://www.postgresql.org/docs/current/logicaldecoding-explanation.html
117+
[serde]: https://serde.rs
118+
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
::pgrx::pgrx_embed!();

0 commit comments

Comments
 (0)