From a080150c411a7f60ea1965884c2602d269128c49 Mon Sep 17 00:00:00 2001 From: Ryan Witt Date: Wed, 6 Mar 2024 15:38:49 -0500 Subject: [PATCH] check in the finally debugged initial version of check command --- src/check.js | 216 ++++++++++++++++++++++++++++++++++++++++++++++++ src/cli.js | 81 +++++++++++++++++- src/defaults.js | 13 ++- src/enqueue.js | 140 +++++++++++++++++++++---------- 4 files changed, 399 insertions(+), 51 deletions(-) create mode 100644 src/check.js diff --git a/src/check.js b/src/check.js new file mode 100644 index 0000000..f6469da --- /dev/null +++ b/src/check.js @@ -0,0 +1,216 @@ +import chalk from 'chalk' +import Debug from 'debug' +import { + GetQueueAttributesCommand, + SetQueueAttributesCommand, + QueueDoesNotExist, + RequestThrottled, + KmsThrottled +} from '@aws-sdk/client-sqs' + +import { + qrlCacheGet, + normalizeQueueName, + normalizeFailQueueName, + normalizeDLQName, + getQnameUrlPairs +} from './qrlCache.js' +import { getSQSClient } from './sqs.js' +import { + getDLQParams, + getFailParams, + getQueueParams, + getOrCreateQueue, + getOrCreateFailQueue, + getOrCreateDLQ +} from './enqueue.js' +import { getOptionsWithDefaults } from './defaults.js' +import { ExponentialBackoff } from './exponentialBackoff.js' + +const debug = Debug('qdone:check') + +/** + * Loops through attributes, checking each and returning true if they match + */ +export function attributesMatch (current, desired, opt, indent = '') { + let match = true + for (const attribute in desired) { + if (current[attribute] !== desired[attribute]) { + if (opt.verbose) console.error(chalk.yellow(indent + 'Attribute mismatch: ') + attribute + chalk.yellow(' should be ') + desired[attribute] + chalk.yellow(' but is ') + current[attribute]) + match = false + } + } + return match +} + +/** + * Checks a DLQ, creating if the create option is set and modifying it if the + * overwrite option is set. + */ +export async function checkDLQ (queue, qrl, opt, indent = '') { + debug({ checkDLQ: { queue, qrl } }) + const dqname = normalizeDLQName(queue, opt) + if (opt.verbose) console.error(chalk.blue(indent + 'checking ') + dqname) + + // Check DLQ + let dqrl + try { + dqrl = await qrlCacheGet(dqname) + } catch (e) { + if (!(e instanceof QueueDoesNotExist)) throw e + if (opt.verbose) console.error(chalk.red(indent + ' does not exist')) + if (opt.create) { + if (opt.verbose) console.error(chalk.green(indent + ' creating')) + dqrl = await getOrCreateDLQ(queue, opt) + } else { + return + } + } + + // Check attributes + const { params: { Attributes: desired } } = getDLQParams(queue, opt) + const { Attributes: current } = await getQueueAttributes(dqrl) + if (attributesMatch(current, desired, opt, indent + ' ')) { + if (opt.verbose) console.error(chalk.green(indent + ' all good')) + } else { + if (opt.overwrite) { + if (opt.verbose) console.error(chalk.green(indent + ' modifying')) + return setQueueAttributes(dqrl, desired) + } + } +} + +/** + * Checks a fail queue, creating if the create option is set and modifying it if the + * overwrite option is set. + */ +export async function checkFailQueue (queue, qrl, opt, indent = '') { + // Check dead first + await checkDLQ(queue, qrl, opt, indent) + + // Check fail queue + const fqname = normalizeFailQueueName(queue, opt) + let fqrl + try { + fqrl = await qrlCacheGet(fqname) + } catch (e) { + if (!(e instanceof QueueDoesNotExist)) throw e + if (opt.verbose) console.error(chalk.red(indent + ' does not exist')) + if (opt.create) { + if (opt.verbose) console.error(chalk.green(indent + ' creating')) + fqrl = await getOrCreateFailQueue(queue, opt) + } else { + return + } + } + + try { + // Get fail queue params, creating fail queue if it doesn't exist and create flag is set + if (opt.verbose) console.error(chalk.blue(indent + 'checking ') + fqname) + const { params: { Attributes: desired } } = await getFailParams(queue, opt) + const { Attributes: current } = await getQueueAttributes(fqrl) + if (attributesMatch(current, desired, opt, indent + ' ')) { + if (opt.verbose) console.error(chalk.green(indent + ' all good')) + } else { + if (opt.overwrite) { + if (opt.verbose) console.error(chalk.green(indent + ' modifying')) + return setQueueAttributes(fqrl, desired) + } + } + } catch (e) { + if (!(e instanceof QueueDoesNotExist)) throw e + if (opt.verbose) console.error(chalk.red(indent + ' missing dlq')) + } +} + +/** + * Checks a queue, creating or modifying it if the create option is set + * and it needs it. + */ +export async function checkQueue (queue, qrl, opt, indent = '') { + const qname = normalizeQueueName(queue, opt) + if (opt.verbose) console.error(chalk.blue(indent + 'checking ') + qname) + await checkFailQueue(queue, qrl, opt, indent + ' ') + try { + const { params: { Attributes: desired } } = await getQueueParams(queue, opt) + const { Attributes: current, $metadata } = await getQueueAttributes(qrl) + debug({ current, $metadata }) + if (attributesMatch(current, desired, opt, indent + ' ')) { + if (opt.verbose) console.error(chalk.green(indent + ' all good')) + } else { + if (opt.overwrite) { + if (opt.verbose) console.error(chalk.green(indent + ' modifying')) + return setQueueAttributes(qrl, desired) + } + } + } catch (e) { + if (!(e instanceof QueueDoesNotExist)) throw e + if (opt.verbose) console.error(chalk.red(indent + ' missing fail queue')) + } +} + +export async function getQueueAttributes (qrl) { + debug('getQueueAttributes(', qrl, ')') + const client = getSQSClient() + const params = { AttributeNames: ['All'], QueueUrl: qrl } + const cmd = new GetQueueAttributesCommand(params) + // debug({ cmd }) + const data = await client.send(cmd) + debug('GetQueueAttributes returned', data) + return data +} + +export async function setQueueAttributes (qrl, attributes) { + debug('setQueueAttributes(', qrl, attributes, ')') + const client = getSQSClient() + const params = { Attributes: attributes, QueueUrl: qrl } + const cmd = new SetQueueAttributesCommand(params) + debug({ cmd }) + const data = await client.send(cmd) + debug('SetQueueAttributes returned', data) + return data +} + +// Retry happens within the context of the send functions +const retryableExceptions = [ + RequestThrottled, + KmsThrottled, + QueueDoesNotExist // Queue could temporarily not exist due to eventual consistency, let it retry +] + +// +// Enqueue a single command +// Returns a promise for the SQS API response. +// +export async function check (queues, options) { + debug('check(', { queues }, ')') + const opt = getOptionsWithDefaults(options) + + // Start processing + if (opt.verbose) console.error(chalk.blue('Resolving queues: ') + queues.join(' ')) + const qnames = queues.map(queue => normalizeQueueName(queue, opt)) + const pairs = await getQnameUrlPairs(qnames, opt) + + // Figure out which queues we want to listen on, choosing between active and + // all, filtering out failed queues if the user wants that + const selectedPairs = pairs + .filter(({ qname }) => qname) + .filter(({ qname }) => { + const suf = opt.failSuffix + (opt.fifo ? '.fifo' : '') + const deadSuf = opt.dlqSuffix + (opt.fifo ? '.fifo' : '') + const isFailQueue = qname.slice(-suf.length) === suf + const isDeadQueue = qname.slice(-deadSuf.length) === deadSuf + const isPlain = !isFailQueue && !isDeadQueue + const shouldInclude = isPlain || (isFailQueue && opt.includeFailed) || (isDeadQueue && opt.includeDead) + return shouldInclude + }) + + for (const { qname, qrl } of selectedPairs) { + debug({ qname, qrl }) + await checkQueue(qname, qrl, opt) + } + + debug({ pairs }) +} + +debug('loaded') \ No newline at end of file diff --git a/src/cli.js b/src/cli.js index 28005d4..6732860 100644 --- a/src/cli.js +++ b/src/cli.js @@ -57,8 +57,23 @@ const enqueueOptionDefinitions = [ { name: 'delay', type: Number, description: 'Delays delivery of the enqueued message by the given number of seconds (up to 900 seconds, or 15 minutes). Defaults to immediate delivery (no delay).' }, { name: 'fail-delay', type: Number, description: 'Delays delivery of all messages on this queue by the given number of seconds (up to 900 seconds, or 15 minutes). Only takes effect if this queue is created during this enqueue operation. Defaults to immediate delivery (no delay).' }, { name: 'dlq', type: Boolean, description: 'Send messages from the failed queue to a DLQ.' }, - { name: 'dql-suffix', type: String, description: `Suffix to append to each queue to generate DLQ name [default: ${defaults.dlqSuffix}]` }, - { name: 'dql-after', type: String, description: `Drives message to the DLQ after this many failures in the failed queue. [default: ${defaults.dlqAfter}]` }, + { name: 'dlq-suffix', type: String, description: `Suffix to append to each queue to generate DLQ name [default: ${defaults.dlqSuffix}]` }, + { name: 'dlq-after', type: String, description: `Drives message to the DLQ after this many failures in the failed queue. [default: ${defaults.dlqAfter}]` }, + { name: 'tag', type: String, multiple: true, description: 'Adds an AWS tag to queue creation. Use the format Key=Value. Can specify multiple times.' } +] + +const checkOptionDefinitions = [ + { name: 'create', type: Boolean, description: 'Create queues that do not exist' }, + { name: 'overwrite', type: Boolean, description: 'Overwrite queue attributes that do not match expected' }, + { name: 'fifo', alias: 'f', type: Boolean, description: 'Create new queues as FIFOs' }, + { name: 'include-failed', type: Boolean, description: 'When using \'*\' do not ignore fail queues.' }, + { name: 'include-dead', type: Boolean, description: 'When using \'*\' do not ignore dead queues.' }, + { name: 'message-retention-period', type: Number, description: `Number of seconds to retain jobs (up to 14 days). [default: ${defaults.messageRetentionPeriod}]` }, + { name: 'delay', type: Number, description: 'Delays delivery of the enqueued message by the given number of seconds (up to 900 seconds, or 15 minutes). Defaults to immediate delivery (no delay).' }, + { name: 'fail-delay', type: Number, description: 'Delays delivery of all messages on this queue by the given number of seconds (up to 900 seconds, or 15 minutes). Only takes effect if this queue is created during this enqueue operation. Defaults to immediate delivery (no delay).' }, + { name: 'dlq', type: Boolean, description: 'Send messages from the failed queue to a DLQ.' }, + { name: 'dlq-suffix', type: String, description: `Suffix to append to each queue to generate DLQ name [default: ${defaults.dlqSuffix}]` }, + { name: 'dlq-after', type: String, description: `Drives message to the DLQ after this many failures in the failed queue. [default: ${defaults.dlqAfter}]` }, { name: 'tag', type: String, multiple: true, description: 'Adds an AWS tag to queue creation. Use the format Key=Value. Can specify multiple times.' } ] @@ -123,6 +138,64 @@ export async function enqueue (argv, testHook) { return result } +export async function check (argv, testHook) { + const optionDefinitions = [].concat(checkOptionDefinitions, globalOptionDefinitions) + const usageSections = [ + { content: 'usage: qdone check [options] ', raw: true }, + { content: 'Options', raw: true }, + { optionList: optionDefinitions }, + { content: 'SQS API Call Complexity', raw: true, long: true }, + { + content: [ + { count: '2 [ + 3 ]', summary: 'one call to resolve the queue name\none call to check the command\none extra calls if the queue does not match and --modify option is set' } + ], + long: true + }, + awsUsageHeader, awsUsageBody + ] + debug('check argv', argv) + + // Parse command and options + let queues, options + try { + options = commandLineArgs(optionDefinitions, { argv, partial: true }) + setupVerbose(options) + debug('check options', options) + if (options.help) return Promise.resolve(console.log(getUsage(usageSections))) + if (!options._unknown || options._unknown.length === 0) throw new UsageError('check requres one or more arguments') + queues = options._unknown + debug('queues', queues) + } catch (err) { + console.log(getUsage(usageSections.filter(s => !s.long))) + throw err + } + + // Process tags + if (options.tag && options.tag.length) { + options.tags = {} + for (const input of options.tag) { + debug({ input }) + if (input.indexOf('=') === -1) throw new UsageError('Tags must be separated with the "=" character.') + const [key, ...rest] = input.split('=') + const value = rest.join('=') + debug({ input, key, rest, value, tags: options.tags }) + options.tags[key] = value + } + } + + // Load module after AWS global load + setupAWS(options) + const { check: checkOriginal } = await import('./check.js') + const check = testHook || checkOriginal + + // Normal (non batch) enqueue + const opt = getOptionsWithDefaults(options) + const result = ( + await withSentry(async () => check(queues, opt), opt) + ) + return result +} + const monitorOptionDefinitions = [ { name: 'save', alias: 's', type: Boolean, description: 'Saves data to CloudWatch' } ] @@ -452,7 +525,7 @@ export async function idleQueues (argv, testHook) { } export async function root (originalArgv, testHook) { - const validCommands = [null, 'enqueue', 'enqueue-batch', 'worker', 'idle-queues', 'monitor'] + const validCommands = [null, 'enqueue', 'enqueue-batch', 'worker', 'idle-queues', 'monitor', 'check'] const usageSections = [ { content: 'qdone - Command line job queue for SQS', raw: true, long: true }, { content: 'usage: qdone [options] ', raw: true }, @@ -505,6 +578,8 @@ export async function root (originalArgv, testHook) { return idleQueues(argv, testHook) } else if (command === 'monitor') { return monitor(argv, testHook) + } else if (command === 'check') { + return check(argv, testHook) } } diff --git a/src/defaults.js b/src/defaults.js index 4920cf3..149d1eb 100644 --- a/src/defaults.js +++ b/src/defaults.js @@ -47,7 +47,10 @@ export const defaults = Object.freeze({ // Idle Queues idleFor: 60, delete: false, - unpair: false + unpair: false, + + // Check + create: false }) function validateInteger (opt, name) { @@ -69,7 +72,7 @@ export function getOptionsWithDefaults (options) { if (!options) options = {} // Activate DLQ if any option is set - const dlq = options.dlq || !!(options['dlq-suffix'] || options['dlq-after'] || options['dlq-name']) + const dlq = options.dlq || !!(options['dlq-suffix'] || options['dlq-after'] || options['dlq-name'] || options.dlqSuffix || options.dlqAfter || options.dlqName) const opt = { // Shared @@ -117,7 +120,11 @@ export function getOptionsWithDefaults (options) { // Idle Queues idleFor: options.idleFor || options['idle-for'] || defaults.idleFor, delete: options.delete || defaults.delete, - unpair: options.delete || defaults.unpair + unpair: options.delete || defaults.unpair, + + // Check + create: options.create || defaults.create, + overwrite: options.overwrite || defaults.overwrite } // Setting this env here means we don't have to in AWS SDK constructors diff --git a/src/enqueue.js b/src/enqueue.js index d805b87..8f826df 100644 --- a/src/enqueue.js +++ b/src/enqueue.js @@ -31,9 +31,20 @@ import { ExponentialBackoff } from './exponentialBackoff.js' const debug = Debug('qdone:enqueue') +export function getDLQParams (queue, opt) { + const dqname = normalizeDLQName(queue, opt) + const params = { + Attributes: { MessageRetentionPeriod: opt.messageRetentionPeriod + '' }, + QueueName: dqname + } + if (opt.tags) params.tags = opt.tags + if (opt.fifo) params.Attributes.FifoQueue = 'true' + return { dqname, params } +} + export async function getOrCreateDLQ (queue, opt) { debug('getOrCreateDLQ(', queue, ')') - const dqname = normalizeDLQName(queue, opt) + const { dqname, params } = getDLQParams(queue, opt) try { const dqrl = await qrlCacheGet(dqname) return dqrl @@ -43,12 +54,6 @@ export async function getOrCreateDLQ (queue, opt) { // Create our DLQ const client = getSQSClient() - const params = { - Attributes: { MessageRetentionPeriod: opt.messageRetentionPeriod + '' }, - QueueName: dqname - } - if (opt.tags) params.tags = opt.tags - if (opt.fifo) params.Attributes.FifoQueue = 'true' const cmd = new CreateQueueCommand(params) if (opt.verbose) console.error(chalk.blue('Creating dead letter queue ') + dqname) const data = await client.send(cmd) @@ -59,35 +64,61 @@ export async function getOrCreateDLQ (queue, opt) { } } +/** + * Returns the parameters needed for creating a failed queue. If DLQ options + * are set, it makes an API call to get this DLQ's ARN. + */ +export async function getFailParams (queue, opt) { + const fqname = normalizeFailQueueName(queue, opt) + const params = { + Attributes: { MessageRetentionPeriod: opt.messageRetentionPeriod + '' }, + QueueName: fqname + } + // If we have a dlq, we grab it and set a redrive policy + if (opt.dlq) { + const dqname = normalizeDLQName(queue, opt) + const dqrl = await qrlCacheGet(dqname) + const dqa = await getQueueAttributes(dqrl) + debug('dqa', dqa) + params.Attributes.RedrivePolicy = JSON.stringify({ + deadLetterTargetArn: dqa.Attributes.QueueArn, + maxReceiveCount: opt.dlqAfter + }) + } + if (opt.failDelay) params.Attributes.DelaySeconds = opt.failDelay + '' + if (opt.tags) params.tags = opt.tags + if (opt.fifo) params.Attributes.FifoQueue = 'true' + return { fqname, params } +} + +/** + * Returns the qrl for the failed queue for the given queue. Creates the queue + * if it does not exist. + */ export async function getOrCreateFailQueue (queue, opt) { debug('getOrCreateFailQueue(', queue, ')') - const fqname = normalizeFailQueueName(queue, opt) try { + const fqname = normalizeFailQueueName(queue, opt) const fqrl = await qrlCacheGet(fqname) return fqrl } catch (err) { // Anything other than queue doesn't exist gets re-thrown if (!(err instanceof QueueDoesNotExist)) throw err - // Crate our fail queue + // Grab params, creating DLQ if needed + const { fqname, params } = await (async () => { + try { + return getFailParams(queue, opt) + } catch (e) { + // If DLQ doesn't exist, create it + if (!(opt.dlq && e instanceof QueueDoesNotExist)) throw e + await getOrCreateDLQ(queue, opt) + return getFailParams(queue, opt) + } + })() + + // Create our fail queue const client = getSQSClient() - const params = { - Attributes: { MessageRetentionPeriod: opt.messageRetentionPeriod + '' }, - QueueName: fqname - } - // If we have a dlq, we grab it and set a redrive policy - if (opt.dlq) { - const dqrl = await getOrCreateDLQ(queue, opt) - const dqa = await getQueueAttributes(dqrl) - debug('dqa', dqa) - params.Attributes.RedrivePolicy = JSON.stringify({ - deadLetterTargetArn: dqa.Attributes.QueueArn, - maxReceiveCount: opt.dlqAfter + '' - }) - } - if (opt.failDelay) params.Attributes.DelaySeconds = opt.failDelay + '' - if (opt.tags) params.tags = opt.tags - if (opt.fifo) params.Attributes.FifoQueue = 'true' const cmd = new CreateQueueCommand(params) if (opt.verbose) console.error(chalk.blue('Creating fail queue ') + fqname) const data = await client.send(cmd) @@ -98,42 +129,61 @@ export async function getOrCreateFailQueue (queue, opt) { } } +/** + * Returns the parameters needed for creating a queue. If fail options + * are set, it makes an API call to get the fail queue's ARN. + */ +export async function getQueueParams (queue, opt) { + const qname = normalizeQueueName(queue, opt) + const fqname = normalizeFailQueueName(queue, opt) + const fqrl = await qrlCacheGet(fqname, opt) + const fqa = await getQueueAttributes(fqrl) + const params = { + Attributes: { + MessageRetentionPeriod: opt.messageRetentionPeriod + '', + RedrivePolicy: JSON.stringify({ + deadLetterTargetArn: fqa.Attributes.QueueArn, + maxReceiveCount: 1 + }) + }, + QueueName: qname + } + if (opt.tags) params.tags = opt.tags + if (opt.fifo) params.Attributes.FifoQueue = 'true' + return { qname, params } +} + /** * Returns a qrl for a queue that either exists or does not */ export async function getOrCreateQueue (queue, opt) { debug('getOrCreateQueue(', queue, ')') - const qname = normalizeQueueName(queue, opt) try { + const qname = normalizeQueueName(queue, opt) const qrl = await qrlCacheGet(qname) return qrl } catch (err) { // Anything other than queue doesn't exist gets re-thrown if (!(err instanceof QueueDoesNotExist)) throw err - // Get our fail queue so we can create our own - const fqrl = await getOrCreateFailQueue(qname, opt) - const fqa = await getQueueAttributes(fqrl) + // Grab params, creating DLQ if needed + const { qname, params } = await (async () => { + try { + return getQueueParams(queue, opt) + } catch (e) { + // If DLQ doesn't exist, create it + if (!(opt.dlq && e instanceof QueueDoesNotExist)) throw e + await getOrCreateDLQ(queue, opt) + return getQueueParams(queue, opt) + } + })() // Create our queue const client = getSQSClient() - const params = { - Attributes: { - MessageRetentionPeriod: opt.messageRetentionPeriod + '', - RedrivePolicy: JSON.stringify({ - deadLetterTargetArn: fqa.Attributes.QueueArn, - maxReceiveCount: '1' - }) - }, - QueueName: qname - } - if (opt.tags) params.tags = opt.tags - if (opt.fifo) params.Attributes.FifoQueue = 'true' const cmd = new CreateQueueCommand(params) - debug({ params }) - if (opt.verbose) console.error(chalk.blue('Creating queue ') + qname) + if (opt.verbose) console.error(chalk.blue('Creating fail queue ') + qname) const data = await client.send(cmd) - debug('createQueue returned', data) + debug('AWS createQueue returned', data) const qrl = data.QueueUrl qrlCacheSet(qname, qrl) return qrl