Skip to content
This repository was archived by the owner on Sep 22, 2020. It is now read-only.

Commit cf5b3d0

Browse files
committed
handle endpoint option in config, fix delay integer parsing, improve shutdown behavior
1 parent 728679e commit cf5b3d0

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
"beanstalk",
1717
"ampq",
1818
"queue",
19-
"faktory"
19+
"faktory",
20+
"sqs"
2021
],
2122
"peerDependencies": {
2223
"grind-cli": "^0.7.0",

src/Drivers/SQSDriver.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ export class SQSDriver extends BaseDriver {
3131

3232
const params = {
3333
QueueUrl: queueUrl,
34-
MessageBody: JSON.stringify(payload),
35-
DelaySeconds: payload.delay || 0
34+
MessageBody: JSON.stringify(payload)
35+
}
36+
37+
if(payload.delay > 0) {
38+
params.DelaySeconds = Math.round(payload.delay / 1000)
3639
}
3740

3841
return this.client.put(params)
@@ -98,6 +101,7 @@ export class SQSDriver extends BaseDriver {
98101
}
99102

100103
destroy() {
104+
this.client.isShutdown = true
101105
this.client.constructor.queueUrls = { }
102106
return super.destroy()
103107
}

src/Support/SQS.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export class SQS {
3232

3333
client = null
3434
queueConfigs = null
35+
isShutdown = false
3536

3637
constructor(config) {
3738
loadPackage()
@@ -47,6 +48,12 @@ export class SQS {
4748
serviceConfig.secretAccessKey = config.secret_key
4849
}
4950

51+
// Point endpoint to a mock SQS server for local testing
52+
// for example, use elasticmq standalone server or docker image
53+
if(!config.endpoint.isNil) {
54+
serviceConfig.endpoint = config.endpoint
55+
}
56+
5057
this.client = new sqs(serviceConfig)
5158
}
5259

@@ -161,6 +168,10 @@ export class SQS {
161168
}
162169

163170
async watch(queue, concurrency, handler) {
171+
if(this.isShutdown) {
172+
return
173+
}
174+
164175
const queueUrl = this.queueUrls[queue]
165176

166177
if(queueUrl.isNil) {
@@ -177,7 +188,11 @@ export class SQS {
177188

178189
const messages = await new Promise(resolve => {
179190
return this.client.receiveMessage(params, (err, data) => {
180-
if(err) {
191+
if(!err.isNil) {
192+
if(!this.isShutdown) {
193+
Log.error(err)
194+
}
195+
181196
return resolve()
182197
} else if((data.Messages === void 0) || (data.Messages.length === 0)) {
183198
return resolve()
@@ -187,6 +202,11 @@ export class SQS {
187202
})
188203
})
189204

205+
// No built-in way to interrupt receiveMessage polling once it starts, so shutdown exits before handling
206+
if(this.isShutdown) {
207+
return
208+
}
209+
190210
if(Array.isArray(messages)) {
191211
// Delete jobs from SQS so they are not reprocessed, then process them
192212
await this.deleteFromQueue(queueUrl, messages)

0 commit comments

Comments
 (0)