Skip to content

Commit d82bba0

Browse files
Merge pull request #7 from valtech-sd/batch-consume-fix
fix(pencil): Optimize ack and nack in batch consume
2 parents 592ede1 + 5300ad4 commit d82bba0

File tree

1 file changed

+10
-20
lines changed

1 file changed

+10
-20
lines changed

src/helpers/message_batching_manager.ts

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -80,23 +80,17 @@ export default class MessageBatchingManager {
8080
* ackMessageList
8181
* Ack all messages in list
8282
* Do this by...
83-
* 1. Loop over all messages in list and ack them
83+
* 1. Ack the last message using allUpTo argumetn to specify that all messages up to the last should be nacked
8484
*
8585
* @param channel: Channel - Channel
8686
* @param messageList: Array<ConsumeMessage> - Messages to be acked
8787
*/
88-
ackMessageList(
89-
channel: Channel,
90-
messageList: Array<ConsumeMessage>,
91-
allUpTo?: boolean
92-
) {
88+
ackMessageList(channel: Channel, messageList: Array<ConsumeMessage>) {
9389
if (this.logger) {
9490
this.logger.trace(`MessageBatchingManager.ackMessageList: Start`);
9591
}
96-
// 1. Loop over all messages in list and ack them
97-
for (let msg of messageList) {
98-
channel.ack(msg, allUpTo);
99-
}
92+
// 1. Ack the last message using allUpTo argumetn to specify that all messages up to the last should be nacked
93+
channel.ack(messageList[messageList.length - 1], true);
10094
if (this.logger) {
10195
this.logger.trace(`MessageBatchingManager.ackMessageList: End`);
10296
}
@@ -106,24 +100,21 @@ export default class MessageBatchingManager {
106100
* nackMessageList
107101
* Nack all messages in list
108102
* Do this by...
109-
* 1. Loop over all messages in list and nack them
103+
* 1. Nack the last message using the allUpTo argument to specify that all messages up to the last should be nacked
110104
*
111105
* @param channel: Channel - Channel
112106
* @param messageList: Array<ConsumeMessage> - Messages to be nacked
113107
*/
114108
nackMessageList(
115109
channel: Channel,
116110
messageList: Array<ConsumeMessage>,
117-
allUpTo?: boolean,
118111
requeue?: boolean
119112
) {
120113
if (this.logger) {
121114
this.logger.trace(`MessageBatchingManager.nackMessageList: Start`);
122115
}
123-
// 1. Loop over all messages in list and nack them
124-
for (let msg of messageList) {
125-
channel.nack(msg, allUpTo, requeue);
126-
}
116+
// 1. Nack the last message using the allUpTo argument to specify that all messages up to the last should be nacked
117+
channel.nack(messageList[messageList.length - 1], true, requeue);
127118
if (this.logger) {
128119
this.logger.trace(`MessageBatchingManager.nackMessageList: End`);
129120
}
@@ -158,10 +149,9 @@ export default class MessageBatchingManager {
158149
},
159150
totalSizeInBytes: bufferSize,
160151
messages: unackedMessageList,
161-
ackAll: (allUpTo?: boolean) =>
162-
this.ackMessageList(channel, unackedMessageList, allUpTo),
163-
nackAll: (allUpTo?: boolean, requeue?: boolean) =>
164-
this.nackMessageList(channel, unackedMessageList, allUpTo, requeue),
152+
ackAll: () => this.ackMessageList(channel, unackedMessageList),
153+
nackAll: (requeue?: boolean) =>
154+
this.nackMessageList(channel, unackedMessageList, requeue),
165155
};
166156
await handler(channel, messages);
167157

0 commit comments

Comments
 (0)