Skip to content

Commit

Permalink
better handling for Failed batch memebers, handle QDnE errors
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Jan 8, 2024
1 parent 0ed1d1e commit a33f7f9
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 19 deletions.
37 changes: 27 additions & 10 deletions src/enqueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ export function formatMessage (command, id) {
return message
}


// Retry happens within the context of the send functions
const retryableExceptions = [
RequestThrottled,
Expand Down Expand Up @@ -219,7 +218,7 @@ export async function sendMessageBatch (qrl, messages, opt) {
Object.assign({ DelaySeconds: opt.delay }, message))
}
if (opt.sentryDsn) {
addBreadcrumb({ category: 'sendMessageBatch', 'message': JSON.stringify({ params }), level: 'debug' })
addBreadcrumb({ category: 'sendMessageBatch', message: JSON.stringify({ params }), level: 'debug' })
}
debug({ params })

Expand All @@ -235,14 +234,33 @@ export async function sendMessageBatch (qrl, messages, opt) {
}
const shouldRetry = (result, error) => {
debug({ shouldRetry: { error, result } })
if (opt.sentryDsn) {
addBreadcrumb({ category: 'sendMessageBatch', 'message': JSON.stringify({ error }), level: 'error' })
if (result) {
// Handle failed result of one or more messages in the batch
if (result.Failed && result.Failed.length) {
for (const failed of result.Failed) {
// Find corresponding messages
const original = params.Entries.find((e) => e.Id === failed.Id)
const info = { failed, original, opt }
if (opt.sentryDsn) {
addBreadcrumb({ category: 'sendMessageBatch', message: 'Failed message: ' + JSON.stringify(info), level: 'error' })
} else {
console.error(info)
}
}
throw new Error('One or more message failures: ' + JSON.stringify(result.Failed))
}
}
for (const exceptionClass of retryableExceptions) {
debug({ exceptionClass, retryableExceptions })
if (error instanceof exceptionClass) {
debug({ sendMessageRetryingBecause: { error, result } })
return true
if (error) {
// Handle a failed result from an overall error on request
if (opt.sentryDsn) {
addBreadcrumb({ category: 'sendMessageBatch', message: JSON.stringify({ error }), level: 'error' })
}
for (const exceptionClass of retryableExceptions) {
debug({ exceptionClass, retryableExceptions })
if (error instanceof exceptionClass) {
debug({ sendMessageRetryingBecause: { error, result } })
return true
}
}
}
}
Expand Down Expand Up @@ -353,7 +371,6 @@ export async function enqueueBatch (pairs, options) {
// so go back through the list of pairs and fire off messages
requestCount = 0
let initialFlushTotal = 0
const addMessagePromises = []
for (const { qname, command } of normalizedPairs) {
const qrl = await getOrCreateQueue(qname, opt)
initialFlushTotal += await addMessage(qrl, command, opt)
Expand Down
27 changes: 21 additions & 6 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import {
ChangeMessageVisibilityCommand,
ReceiveMessageCommand,
DeleteMessageCommand
DeleteMessageCommand,
QueueDoesNotExist
} from '@aws-sdk/client-sqs'
import { exec } from 'child_process' // node:child_process
import treeKill from 'tree-kill'
Expand Down Expand Up @@ -208,11 +209,25 @@ export async function listen (queues, options) {
chalk.blue(' (' + qrl + ')')
)
}
// Aggregate the results
const { noJobs, jobsSucceeded, jobsFailed } = await pollForJobs(qname, qrl, opt)
stats.noJobs += noJobs
stats.jobsFailed += jobsFailed
stats.jobsSucceeded += jobsSucceeded
try {
// Aggregate the results
const { noJobs, jobsSucceeded, jobsFailed } = await pollForJobs(qname, qrl, opt)
stats.noJobs += noJobs
stats.jobsFailed += jobsFailed
stats.jobsSucceeded += jobsSucceeded
} catch (e) {
if (e instanceof QueueDoesNotExist) {
if (opt.verbose) {
console.error(
chalk.yellow('Warning: Queue ') +
qname.slice(opt.prefix.length) +
chalk.yellow(' does not exist.')
)
}
} else {
throw e
}
}
}
return stats
}
Expand Down
3 changes: 1 addition & 2 deletions test/enqueue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import {
SendMessageCommand,
SendMessageBatchCommand,
QueueDoesNotExist,
RequestThrottled,
KmsThrottled
RequestThrottled
} from '@aws-sdk/client-sqs'
import { mockClient } from 'aws-sdk-client-mock'
import 'aws-sdk-client-mock-jest'
Expand Down
1 change: 0 additions & 1 deletion test/exponentialBackoff.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { jest } from '@jest/globals'
import { ExponentialBackoff } from '../src/exponentialBackoff.js'

describe('ExpontentialBackoff', () => {

beforeAll(() => {
jest.useFakeTimers()
jest.spyOn(global, 'setTimeout')
Expand Down

0 comments on commit a33f7f9

Please sign in to comment.