Skip to content

Commit c9a731b

Browse files
authored
Allow custom serializer / deserializer functions (#21)
1 parent 77bebce commit c9a731b

File tree

2 files changed

+57
-6
lines changed

2 files changed

+57
-6
lines changed

src/index.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,18 @@ export interface Options {
6464
* Timeout in ms after which to stop retrying and just fail. Defaults to 3000 ms.
6565
*/
6666
retryTimeout?: number
67+
68+
/**
69+
* Custom function to control how the payload data is stringified on `.notify()`.
70+
* Use together with the `serialize` option. Defaults to `JSON.parse`.
71+
*/
72+
parse?: (serialized: string) => any
73+
74+
/**
75+
* Custom function to control how the payload data is stringified on `.notify()`.
76+
* Use together with the `parse` option. Defaults to `JSON.stringify`.
77+
*/
78+
serialize?: (data: any) => string
6779
}
6880

6981
function connect (connectionConfig: pg.ClientConfig | undefined, options: Options) {
@@ -113,13 +125,13 @@ function connect (connectionConfig: pg.ClientConfig | undefined, options: Option
113125
}
114126
}
115127

116-
function forwardDBNotificationEvents (dbClient: pg.Client, emitter: TypedEventEmitter<PgListenEvents>) {
128+
function forwardDBNotificationEvents (dbClient: pg.Client, emitter: TypedEventEmitter<PgListenEvents>, parse: (stringifiedData: string) => any) {
117129
const onNotification = (notification: PgNotification) => {
118130
notificationLogger(`Received PostgreSQL notification on "${notification.channel}":`, notification.payload)
119131

120132
let payload
121133
try {
122-
payload = notification.payload ? JSON.parse(notification.payload) : notification.payload
134+
payload = notification.payload !== undefined ? parse(notification.payload) : undefined
123135
} catch (error) {
124136
error.message = `Error parsing PostgreSQL notification payload: ${error.message}`
125137
return emitter.emit("error", error)
@@ -173,7 +185,11 @@ export interface Subscriber {
173185
}
174186

175187
function createPostgresSubscriber (connectionConfig?: pg.ClientConfig, options: Options = {}): Subscriber {
176-
const { paranoidChecking = 30000 } = options
188+
const {
189+
paranoidChecking = 30000,
190+
parse = JSON.parse,
191+
serialize = JSON.stringify
192+
} = options
177193

178194
const emitter = new EventEmitter() as TypedEventEmitter<PgListenEvents>
179195
emitter.setMaxListeners(0) // unlimited listeners
@@ -197,7 +213,7 @@ function createPostgresSubscriber (connectionConfig?: pg.ClientConfig, options:
197213

198214
const initialize = (client: pg.Client) => {
199215
// Wire the DB client events to our exposed emitter's events
200-
cancelEventForwarding = forwardDBNotificationEvents(client, emitter)
216+
cancelEventForwarding = forwardDBNotificationEvents(client, emitter, parse)
201217

202218
dbClient.on("error", (error: any) => {
203219
if (!reinitializingRightNow) {
@@ -282,7 +298,8 @@ function createPostgresSubscriber (connectionConfig?: pg.ClientConfig, options:
282298
},
283299
notify (channelName: string, payload: any) {
284300
notificationLogger(`Sending PostgreSQL notification to "${channelName}":`, payload)
285-
return dbClient.query(`NOTIFY ${format.ident(channelName)}, ${format.literal(JSON.stringify(payload))}`)
301+
const serialized = serialize(payload)
302+
return dbClient.query(`NOTIFY ${format.ident(channelName)}, ${format.literal(serialized)}`)
286303
},
287304
unlisten (channelName: string) {
288305
if (subscribedChannels.indexOf(channelName) === -1) {

test/integration.test.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,41 @@ test("can listen and notify", async t => {
4747
}
4848
})
4949

50-
test("getting notified after connection is terminated", async t => {
50+
test("can use custom `parse` function", async t => {
51+
const notifications: PgParsedNotification[] = []
52+
const receivedPayloads: any[] = []
53+
54+
const connectionString = "postgres://postgres:postgres@localhost:5432/postgres"
55+
56+
const hub = createPostgresSubscriber(
57+
{ connectionString },
58+
{ parse: (base64: string) => Buffer.from(base64, "base64").toString("utf8") }
59+
)
60+
await hub.connect()
61+
62+
let client = new pg.Client({ connectionString })
63+
await client.connect()
64+
65+
try {
66+
await hub.listenTo("test")
67+
await hub.events.on("notification", (notification: PgParsedNotification) => notifications.push(notification))
68+
69+
await client.query(`NOTIFY test, '${Buffer.from("I am a payload.", "utf8").toString("base64")}'`)
70+
await delay(200)
71+
72+
t.deepEqual(notifications, [
73+
{
74+
channel: "test",
75+
payload: "I am a payload.",
76+
processId: notifications[0].processId
77+
}
78+
])
79+
} finally {
80+
await hub.close()
81+
}
82+
})
83+
84+
test.serial("getting notified after connection is terminated", async t => {
5185
const notifications: PgParsedNotification[] = []
5286
const receivedPayloads: any[] = []
5387
let reconnects = 0

0 commit comments

Comments
 (0)