Skip to content

Commit

Permalink
Emit "connected" event (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
andywer authored Sep 1, 2019
1 parent d39914c commit 2a061b7
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 12 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ For details see [dist/index.d.ts](./dist/index.d.ts).

## Error & event handling

#### `instance.events.on("connected", listener: () => void)`

The `connected` event is emitted once after initially establishing the connection and later once after every successful reconnect. Reconnects happen automatically when `pg-listen` detects that the connection closed or became unresponsive.

#### `instance.events.on("error", listener: (error: Error) => void)`

An `error` event is emitted for fatal errors that affect the notification subscription. A standard way of handling those kinds of errors would be to `console.error()`-log the error and terminate the process with a non-zero exit code.
Expand Down
1 change: 1 addition & 0 deletions dist/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface PgParsedNotification {
payload?: any;
}
interface PgListenEvents {
connected: () => void;
error: (error: Error) => void;
notification: (notification: PgParsedNotification) => void;
reconnect: (attempt: number) => void;
Expand Down
12 changes: 8 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface PgParsedNotification {
}

interface PgListenEvents {
connected: () => void,
error: (error: Error) => void,
notification: (notification: PgParsedNotification) => void,
reconnect: (attempt: number) => void
Expand Down Expand Up @@ -78,7 +79,7 @@ export interface Options {
serialize?: (data: any) => string
}

function connect (connectionConfig: pg.ClientConfig | undefined, options: Options) {
function connect(connectionConfig: pg.ClientConfig | undefined, emitter: TypedEventEmitter<PgListenEvents>, options: Options) {
connectionLogger("Creating PostgreSQL client for notification streaming")

const { retryInterval = 500, retryLimit = Infinity, retryTimeout = 3000 } = options
Expand Down Expand Up @@ -203,7 +204,7 @@ function createPostgresSubscriber (connectionConfig?: pg.ClientConfig, options:
notificationsEmitter.emit(notification.channel, notification.payload)
})

const { dbClient: initialDBClient, reconnect } = connect(connectionConfig, options)
const { dbClient: initialDBClient, reconnect } = connect(connectionConfig, emitter, options)

let closing = false
let dbClient = initialDBClient
Expand Down Expand Up @@ -257,6 +258,8 @@ function createPostgresSubscriber (connectionConfig?: pg.ClientConfig, options:
await Promise.all(subscribedChannels.map(
channelName => dbClient.query(`LISTEN ${format.ident(channelName)}`)
))

emitter.emit("connected")
} catch (error) {
error.message = `Re-initializing the PostgreSQL notification client after connection loss failed: ${error.message}`
connectionLogger(error.stack || error)
Expand All @@ -276,9 +279,10 @@ function createPostgresSubscriber (connectionConfig?: pg.ClientConfig, options:
notifications: notificationsEmitter,

/** Don't forget to call this asyncronous method before doing your thing */
connect () {
async connect () {
initialize(dbClient)
return dbClient.connect()
await dbClient.connect()
emitter.emit("connected")
},
close () {
connectionLogger("Closing PostgreSQL notification listener.")
Expand Down
28 changes: 20 additions & 8 deletions test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ test("can connect", async t => {
})

test("can listen and notify", async t => {
let connectedEvents = 0
const notifications: PgParsedNotification[] = []
const receivedPayloads: any[] = []

const hub = createPostgresSubscriber({ connectionString: "postgres://postgres:postgres@localhost:5432/postgres" })

hub.events.on("connected", () => connectedEvents++)
hub.events.on("notification", (notification: PgParsedNotification) => notifications.push(notification))
hub.notifications.on("test", (payload: any) => receivedPayloads.push(payload))

await hub.connect()

try {
await hub.listenTo("test")
await hub.events.on("notification", (notification: PgParsedNotification) => notifications.push(notification))
await hub.notifications.on("test", (payload: any) => receivedPayloads.push(payload))

await hub.notify("test", { hello: "world" })
await hub.notify("test2", "should not be received, since not subscribed to channel test2")
Expand All @@ -42,6 +46,7 @@ test("can listen and notify", async t => {
t.deepEqual(receivedPayloads, [
{ hello: "world" }
])
t.is(connectedEvents, 1)
} finally {
await hub.close()
}
Expand All @@ -56,8 +61,9 @@ test("can handle notification without payload", async t => {

try {
await hub.listenTo("test")
await hub.events.on("notification", (notification: PgParsedNotification) => notifications.push(notification))
await hub.notifications.on("test", (payload: any) => receivedPayloads.push(payload))

hub.events.on("notification", (notification: PgParsedNotification) => notifications.push(notification))
hub.notifications.on("test", (payload: any) => receivedPayloads.push(payload))

await hub.notify("test")
await delay(200)
Expand Down Expand Up @@ -110,9 +116,11 @@ test("can use custom `parse` function", async t => {
})

test.serial("getting notified after connection is terminated", async t => {
let connectedEvents = 0
let reconnects = 0

const notifications: PgParsedNotification[] = []
const receivedPayloads: any[] = []
let reconnects = 0

const connectionString = "postgres://postgres:postgres@localhost:5432/postgres"
let client = new pg.Client({ connectionString })
Expand All @@ -122,13 +130,16 @@ test.serial("getting notified after connection is terminated", async t => {
{ connectionString: connectionString + "?ApplicationName=pg-listen-termination-test" },
{ paranoidChecking: 1000 }
)

hub.events.on("connected", () => connectedEvents++)
hub.events.on("notification", (notification: PgParsedNotification) => notifications.push(notification))
hub.events.on("reconnect", () => reconnects++)
hub.notifications.on("test", (payload: any) => receivedPayloads.push(payload))

await hub.connect()

try {
await hub.listenTo("test")
hub.events.on("notification", (notification: PgParsedNotification) => notifications.push(notification))
hub.events.on("reconnect", () => reconnects++)
hub.notifications.on("test", (payload: any) => receivedPayloads.push(payload))

await delay(1000)
debug("Terminating database backend")
Expand Down Expand Up @@ -156,6 +167,7 @@ test.serial("getting notified after connection is terminated", async t => {
{ hello: "world" }
])
t.is(reconnects, 1)
t.is(connectedEvents, 2)
} finally {
debug("Closing the subscriber")
await hub.close()
Expand Down

0 comments on commit 2a061b7

Please sign in to comment.