Skip to content

Commit

Permalink
initial attempt at pulling retries into qdone
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Jan 7, 2024
1 parent 2f2cec4 commit a169569
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 24 deletions.
4 changes: 3 additions & 1 deletion src/defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ export const defaults = Object.freeze({
// Enqueue
groupId: uuidv1(),
groupIdPerMessage: false,
deduplicationId: uuidv1(),
deduplicationId: undefined,
messageRetentionPeriod: 1209600,
delay: 0,
sendRetries: 6,
failDelay: 0,
dlq: false,
dlqSuffix: '_dead',
Expand Down Expand Up @@ -79,6 +80,7 @@ export function getOptionsWithDefaults (options) {
deduplicationId: options.deduplicationId || options['deduplication-id'] || defaults.deduplicationId,
messageRetentionPeriod: options.messageRetentionPeriod || options['message-retention-period'] || defaults.messageRetentionPeriod,
delay: options.delay || defaults.delay,
sendRetries: options['send-retries'] || defaults.sendRetries,
failDelay: options.failDelay || options['fail-delay'] || defaults.failDelay,
dlq: dlq || defaults.dlq,
dlqSuffix: options.dlqSuffix || options['dlq-suffix'] || defaults.dlqSuffix,
Expand Down
86 changes: 64 additions & 22 deletions src/enqueue.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
// const Q = require('q')
// const debug = require('debug')('qdone:enqueue')
// const chalk = require('chalk')
// const uuid = require('uuid')
// const qrlCache = require('./qrlCache')
// const AWS = require('aws-sdk')

import { addBreadcrumb } from '@sentry/node'
import { v1 as uuidV1 } from 'uuid'
import chalk from 'chalk'
import Debug from 'debug'
Expand All @@ -13,7 +7,9 @@ import {
GetQueueAttributesCommand,
SendMessageCommand,
SendMessageBatchCommand,
QueueDoesNotExist
QueueDoesNotExist,
RequestThrottled,
KmsThrottled
} from '@aws-sdk/client-sqs'

import {
Expand All @@ -25,6 +21,7 @@ import {
} from './qrlCache.js'
import { getSQSClient } from './sqs.js'
import { getOptionsWithDefaults } from './defaults.js'
import { ExponentialBackoff } from './exponentialBackoff.js'

const debug = Debug('qdone:enqueue')

Expand Down Expand Up @@ -162,21 +159,46 @@ export function formatMessage (command, id) {
return message
}


// Retry happens within the context of the send functions
const retryableExceptions = [
RequestThrottled,
KmsThrottled,
QueueDoesNotExist // Queue could temporarily not exist due to eventual consistency, let it retry
]

export async function sendMessage (qrl, command, opt) {
debug('sendMessage(', qrl, command, ')')
const params = Object.assign({ QueueUrl: qrl }, formatMessage(command))
// Add in group id if we're using fifo
if (opt.fifo) {
params.MessageGroupId = opt.groupId
params.MessageDeduplicationId = opt.deduplicationId
params.MessageDeduplicationId = opt.deduplicationId || uuidV1()
}
if (opt.delay) params.DelaySeconds = opt.delay

// Send it
const client = getSQSClient()
const cmd = new SendMessageCommand(params)
debug({ cmd })
const data = await client.send(cmd)
debug('sendMessage returned', data)
return data
const backoff = new ExponentialBackoff(opt.sendRetries)
const send = async (attemptNumber) => {
cmd.input.attemptNumber = attemptNumber
const data = await client.send(cmd)
debug('sendMessage returned', data)
return data
}
const shouldRetry = async (result, error) => {
for (const exceptionClass of retryableExceptions) {
if (error instanceof exceptionClass) {
debug({ sendMessageRetryingBecause: { error, result } })
return true
}
}
return false
}
const result = await backoff.run(send, shouldRetry)
debug({ sendMessageResult: result })
return result
}

export async function sendMessageBatch (qrl, messages, opt) {
Expand All @@ -196,12 +218,35 @@ export async function sendMessageBatch (qrl, messages, opt) {
params.Entries = params.Entries.map(message =>
Object.assign({ DelaySeconds: opt.delay }, message))
}
if (opt.sentryDsn) {
addBreadcrumb({ category: 'sendMessageBatch', 'message': JSON.stringify({ params }), level: 'debug' })
}
debug({ params })

// Send them
const client = getSQSClient()
const cmd = new SendMessageBatchCommand(params)
debug({ cmd })
const data = await client.send(cmd)
debug('sendMessageBatch returned', data)
return data
const backoff = new ExponentialBackoff(opt.sendRetries)
const send = async (attemptNumber) => {
debug({ sendMessageBatchSend: { attemptNumber, params } })
const data = await client.send(cmd)
return data
}
const shouldRetry = (result, error) => {
debug({ shouldRetry: { error, result } })
if (opt.sentryDsn) {
addBreadcrumb({ category: 'sendMessageBatch', 'message': JSON.stringify({ error }), level: 'error' })
}
for (const exceptionClass of retryableExceptions) {
debug({ exceptionClass, retryableExceptions })
if (error instanceof exceptionClass) {
debug({ sendMessageRetryingBecause: { error, result } })
return true
}
}
}
return backoff.run(send, shouldRetry)
}

const messages = {}
Expand Down Expand Up @@ -263,6 +308,7 @@ export async function addMessage (qrl, command, opt) {
const message = formatMessage(command, messageIndex++)
messages[qrl] = messages[qrl] || []
messages[qrl].push(message)
debug({ location: 'addMessage', messages })
if (messages[qrl].length >= 10) {
return flushMessages(qrl, opt)
}
Expand Down Expand Up @@ -306,16 +352,12 @@ export async function enqueueBatch (pairs, options) {
// After we've prefetched, all qrls are in cache
// so go back through the list of pairs and fire off messages
requestCount = 0
let initialFlushTotal = 0
const addMessagePromises = []
for (const { qname, command } of normalizedPairs) {
const qrl = await getOrCreateQueue(qname, opt)
addMessagePromises.push(addMessage(qrl, command, opt))
initialFlushTotal += await addMessage(qrl, command, opt)
}
const flushCounts = await Promise.all(addMessagePromises)

// Count up how many were flushed during add
debug('flushCounts', flushCounts)
const initialFlushTotal = flushCounts.reduce((a, b) => a + b, 0)

// And flush any remaining messages
const extraFlushPromises = []
Expand Down
111 changes: 111 additions & 0 deletions src/exponentialBackoff.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Exponential backoff controller.
* usage:
* const exp = new ExponentialBackoff()
* const result = await exp.run(
* function action (attemptNumber) {
* console.log(attemptNumber) // 1, 2, 3, ...
* return axios.post(...)
* },
* function shouldRetry (returnValue, error) {
* if (returnValue && return value.code = 500) return true
* if (error && error.message === 'Internal Server Error') return true
* }
* )
*/

export class ExponentialBackoff {
/**
* Creates various behaviors for backoff.
* @param {number} maxRetries - Number of times to attempt the action before
* throwing an error. Defaults to 3.
* @param {number} maxJitterPercent - Jitter as a percentage of the delay.
* For example, if the exponential delay is 2 seconds, then a jitter of
* 0.5 could lead to a delay as low as 1 second and as high as 3 seconds,
* since 0.5 * 2 = 1. Defaults to 0.5.
* @param {number} exponentBase - The base for the exponent. Defaults to 2,
* which means the delay doubles every attempt.
*/
constructor (maxRetries = 3, maxJitterPercent = 0.5, exponentBase = 2) {
if (maxRetries < 1) throw new Error('maxRetries must be >= 1')
if (maxJitterPercent < 0.1 || maxJitterPercent > 1) throw new Error('maxJitterPercent must be in the interval [0.1, 1]')
if (exponentBase < 1 || exponentBase > 10) throw new Error('exponentBase must be in the range [1, 10]')
this.maxRetries = parseInt(maxRetries)
this.maxJitterPercent = parseFloat(maxJitterPercent)
this.exponentBase = parseFloat(exponentBase)
this.attemptNumber = 0
}

/**
* Calculates how many ms to delay based on the current attempt number.
*/
calculateDelayMs (attemptNumber) {
const secondsRaw = this.exponentBase ** attemptNumber // 2, 4, 8, 16, ....
const jitter = this.maxJitterPercent * (Math.random() - 0.5) // [-0.5, 0.5]
const delayMs = Math.round(secondsRaw * (1 + jitter) * 1000)
// console.log({ secondsRaw, jitter, delayMs })
return delayMs
}

/**
* Resolves after a delay set by the current attempt.
*/
async delay (attemptNumber) {
// console.log(attemptNumber)
const delay = this.calculateDelayMs(attemptNumber)
// console.log({ function: 'delay', attemptNumber, delay })
return new Promise((resolve, reject) => setTimeout(resolve, delay))
}

/**
* Call another function repeatedly, retrying with exponential backoff and
* jitter if not successful.
* @param {ExponentialBackoff~action} action - Callback that does the action
* to be attempted (web request, rpc, database call, etc). Will be called
* again after the exponential dealy if shouldRetry() returns true.
* @param {ExponentialBackoff~shouldRetry} shouldRetry - Callback that gets
* to look at the return value of action() and any potential exception. If
* this returns true then the action will be retried with the appropriate
* backoff delay. Defaults to a function that returns true if an exception
* is thrown.
*/
async run (
action = async (attemptNumber) => undefined,
shouldRetry = async (returnValue, error) => !!error
) {
let attemptNumber = 0
while (attemptNumber++ < this.maxRetries) {
try {
const result = await action(attemptNumber)
if (await shouldRetry(result, undefined)) {
if (attemptNumber >= this.maxRetries) throw new Error('Maximum number of attempts reached')
await this.delay(attemptNumber)
} else {
return result
}
} catch (e) {
if (await shouldRetry(undefined, e)) {
if (attemptNumber >= this.maxRetries) throw e
await this.delay(attemptNumber)
} else {
throw e
}
}
}
}

/**
* Callback used by run().
* @callback ExponentialBackoff~action
* @param {number} attemptNumber - Which attempt this is, i.e. 1, 2, 3, ...
*/

/**
* Callback used by run().
* @callback ExponentialBackoff~shouldRetry
* @param returnValue - The value returned by your action. If an exception
* was thrown by the action then this is undefined.
* @param error - The exception thrown by your action. If there was no
* exception, this is undefined.
*/
}
52 changes: 51 additions & 1 deletion test/enqueue.test.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { jest } from '@jest/globals'
import {
CreateQueueCommand,
GetQueueUrlCommand,
GetQueueAttributesCommand,
SendMessageCommand,
SendMessageBatchCommand,
QueueDoesNotExist
QueueDoesNotExist,
RequestThrottled,
KmsThrottled
} from '@aws-sdk/client-sqs'
import { mockClient } from 'aws-sdk-client-mock'
import 'aws-sdk-client-mock-jest'
Expand All @@ -29,6 +32,7 @@ import { loadBatchFiles } from '../src/cli.js'

getSQSClient()
const client = getSQSClient()
jest.useFakeTimers()

// Always clear qrl cache at the beginning of each test
beforeEach(qrlCacheClear)
Expand Down Expand Up @@ -550,6 +554,52 @@ describe('sendMessage', () => {
}, formatMessage(cmd))
)
})

test('retryable exceptions cause retry', async () => {
const groupId = 'foo'
const deduplicationId = 'bar'
const options = {
delay: 15,
fifo: true,
'group-id': groupId,
'deduplication-id': deduplicationId
}
const opt = getOptionsWithDefaults(options)
const qname = 'testqueue'
const qrl = `https://sqs.us-east-1.amazonaws.com/foobar/${qname}`
const cmd = 'sd BulkStatusModel finalizeAll'
const sqsMock = mockClient(client)
const messageId = '1e0632f4-b9e8-4f5c-a8e2-3529af1a56d6'
const md5 = 'foobar'
setSQSClient(sqsMock)
sqsMock
.on(SendMessageCommand, { QueueUrl: qrl })
.rejectsOnce(new RequestThrottled())
// .rejectsOnce(new KmsThrottled())
// .rejectsOnce(new QueueDoesNotExist())
.resolvesOnce({ MD5OfMessageBody: md5, MessageId: messageId })
const promise = sendMessage(qrl, cmd, opt)

await Promise.resolve() // shouldRetry()
await Promise.resolve() // await this.delay(attemptNumber)
jest.runAllTimers() // delay() -> setTimeout()

await Promise.resolve() // await action
jest.runAllTimers() // not sure why here

await expect(promise).resolves.toEqual({ MD5OfMessageBody: md5, MessageId: messageId })
expect(sqsMock)
.toHaveReceivedNthCommandWith(
2,
SendMessageCommand,
Object.assign({
QueueUrl: qrl,
MessageGroupId: groupId,
DelaySeconds: options.delay,
MessageDeduplicationId: deduplicationId
}, formatMessage(cmd))
)
})
})

describe('sendMessageBatch', () => {
Expand Down
Loading

0 comments on commit a169569

Please sign in to comment.