Skip to content

Commit d002135

Browse files
l4mbymagne
andauthored
feat: block message handler execution if consumer is closed (#211)
* feat: ignore local offset update if consumer closed * chore: refactor message handling in consumer --------- Co-authored-by: magne <[email protected]>
1 parent 94bc290 commit d002135

File tree

4 files changed

+133
-47
lines changed

4 files changed

+133
-47
lines changed

example/package-lock.json

Lines changed: 20 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
const rabbit = require("rabbitmq-stream-js-client")
2+
3+
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))
4+
5+
async function main() {
6+
console.log("Connecting...")
7+
const client = await rabbit.connect({
8+
hostname: "localhost",
9+
port: 5552,
10+
username: "rabbit",
11+
password: "rabbit",
12+
vhost: "/",
13+
})
14+
15+
console.log("Making sure the stream exists...")
16+
const streamName = "stream-offset-tracking-javascript"
17+
await client.createStream({ stream: streamName, arguments: {} })
18+
19+
const consumerRef = "offset-tracking-tutorial"
20+
let firstOffset = undefined
21+
let offsetSpecification = rabbit.Offset.first()
22+
try {
23+
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName })
24+
offsetSpecification = rabbit.Offset.offset(offset + 1n)
25+
} catch (e) {}
26+
27+
let lastOffset = offsetSpecification.value
28+
let messageCount = 0
29+
const consumer = await client.declareConsumer(
30+
{ stream: streamName, offset: offsetSpecification, consumerRef },
31+
async (message) => {
32+
messageCount++
33+
if (!firstOffset && messageCount === 1) {
34+
firstOffset = message.offset
35+
console.log("First message received")
36+
}
37+
if (messageCount % 10 === 0) {
38+
await consumer.storeOffset(message.offset)
39+
}
40+
if (message.content.toString() === "marker") {
41+
console.log("Marker found")
42+
lastOffset = message.offset
43+
await consumer.storeOffset(message.offset)
44+
await consumer.close()
45+
}
46+
}
47+
)
48+
49+
console.log(`Start consuming...`)
50+
await sleep(2000)
51+
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`)
52+
}
53+
54+
main()
55+
.then(() => process.exit(0))
56+
.catch((res) => {
57+
console.log("Error while receiving message!", res)
58+
process.exit(-1)
59+
})
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
const rabbit = require("rabbitmq-stream-js-client")
2+
3+
async function main() {
4+
console.log("Connecting...")
5+
const client = await rabbit.connect({
6+
vhost: "/",
7+
port: 5552,
8+
hostname: "localhost",
9+
username: "rabbit",
10+
password: "rabbit",
11+
})
12+
13+
console.log("Making sure the stream exists...")
14+
const streamName = "stream-offset-tracking-javascript"
15+
await client.createStream({ stream: streamName, arguments: {} })
16+
17+
console.log("Creating the publisher...")
18+
const publisher = await client.declarePublisher({ stream: streamName })
19+
20+
const messageCount = 100
21+
console.log(`Publishing ${messageCount} messages`)
22+
for (let i = 0; i < messageCount; i++) {
23+
const body = i === messageCount - 1 ? "marker" : `hello ${i}`
24+
await publisher.send(Buffer.from(body))
25+
}
26+
27+
console.log("Closing the connection...")
28+
await client.close()
29+
}
30+
31+
main()
32+
.then(() => console.log("done!"))
33+
.catch((res) => {
34+
console.log("Error in publishing message!", res)
35+
process.exit(-1)
36+
})

src/consumer.ts

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ export class StreamConsumer implements Consumer {
2828
public offset: Offset
2929
private clientLocalOffset: Offset
3030
private creditsHandler: ConsumerCreditPolicy
31-
readonly handle: ConsumerFunc
31+
private consumerHandle: ConsumerFunc
32+
private closed: boolean
3233

3334
constructor(
3435
handle: ConsumerFunc,
@@ -50,10 +51,12 @@ export class StreamConsumer implements Consumer {
5051
this.clientLocalOffset = this.offset.clone()
5152
this.connection.incrRefCount()
5253
this.creditsHandler = params.creditPolicy || defaultCreditPolicy
53-
this.handle = this.wrapHandle(handle, params.offset)
54+
this.consumerHandle = handle
55+
this.closed = false
5456
}
5557

5658
async close(manuallyClose: boolean): Promise<void> {
59+
this.closed = true
5760
this.connection.decrRefCount()
5861
if (ConnectionPool.removeIfUnused(this.connection)) {
5962
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
@@ -79,31 +82,10 @@ export class StreamConsumer implements Consumer {
7982
return this.clientLocalOffset.clone()
8083
}
8184

82-
private wrapHandle(handle: ConsumerFunc, offset: Offset) {
83-
const updateLocalOffsetHandle = this.updateLocalOffsetHandle(handle)
84-
return this.addOffsetFilterToHandle(updateLocalOffsetHandle, offset)
85-
}
86-
87-
private updateLocalOffsetHandle(handle: ConsumerFunc) {
88-
const wrapped = (message: Message) => {
89-
const result = handle(message)
90-
if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset)
91-
return result
92-
}
93-
return wrapped
94-
}
95-
96-
private addOffsetFilterToHandle(handle: ConsumerFunc, offset: Offset) {
97-
if (offset.type === "numeric") {
98-
const handlerWithFilter = (message: Message) => {
99-
if (message.offset !== undefined && message.offset < offset.value!) {
100-
return
101-
}
102-
handle(message)
103-
}
104-
return handlerWithFilter
105-
}
106-
return handle
85+
public handle(message: Message) {
86+
if (this.closed || this.isMessageOffsetLessThanConsumers(message)) return
87+
this.consumerHandle(message)
88+
this.maybeUpdateLocalOffset(message)
10789
}
10890

10991
public get streamName(): string {
@@ -117,4 +99,13 @@ export class StreamConsumer implements Consumer {
11799
public get creditPolicy() {
118100
return this.creditsHandler
119101
}
102+
103+
private maybeUpdateLocalOffset(message: Message) {
104+
if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset)
105+
}
106+
107+
// TODO -- Find better name?
108+
private isMessageOffsetLessThanConsumers(message: Message) {
109+
return this.offset.type === "numeric" && message.offset !== undefined && message.offset < this.offset.value!
110+
}
120111
}

0 commit comments

Comments
 (0)