From 037479d12b1a7ac00ca0f3dd5f9070d68b115011 Mon Sep 17 00:00:00 2001 From: Ryan Witt Date: Wed, 10 Jul 2024 17:55:15 -0400 Subject: [PATCH] fix monitor bug, delete orphaned idle queues, validate more strings --- src/defaults.js | 11 +++++++++ src/idleQueues.js | 58 ++++++++++++++-------------------------------- src/monitor.js | 3 ++- test/dedup.test.js | 2 ++ 4 files changed, 33 insertions(+), 41 deletions(-) diff --git a/src/defaults.js b/src/defaults.js index 54155f3..b69816b 100644 --- a/src/defaults.js +++ b/src/defaults.js @@ -60,6 +60,12 @@ export function validateInteger (opt, name) { return parsed } +export function validateQueueName (opt, name) { + if (typeof name !== 'string') throw new Error(`${name} must be a string.`) + if (!name.match(/^[a-z0-9-_]+$/i)) throw new Error(`${name} can contain only numbers, letters, hypens and underscores.`) + return name +} + export function validateMessageOptions (messageOptions) { const validKeys = ['deduplicationId', 'groupId'] if (typeof messageOptions === 'object' && @@ -158,6 +164,11 @@ export function getOptionsWithDefaults (options) { opt.maxMemoryPercent = validateInteger(opt, 'maxMemoryPercent') opt.idleFor = validateInteger(opt, 'idleFor') + validateQueueName(opt, 'region') + validateQueueName(opt, 'prefix') + validateQueueName(opt, 'failSuffix') + validateQueueName(opt, 'dlqSuffix') + // Validate dedup args if (opt.externalDedup && !opt.cacheUri) throw new Error('--external-dedup requires the --cache-uri argument') if (opt.externalDedup && (opt.dedupPeriod < 1)) throw new Error('--external-dedup of redis requires a --dedup-period > 1 second') diff --git a/src/idleQueues.js b/src/idleQueues.js index 6dc4a15..50095c4 100644 --- a/src/idleQueues.js +++ b/src/idleQueues.js @@ -186,35 +186,6 @@ export async function deleteQueue (qname, qrl, opt) { } } -/** - * Processes a single queue, checking for idle, deleting if applicable. - */ -export async function processQueue (qname, qrl, opt) { - const result = await checkIdle(qname, qrl, opt) - debug(qname, result) - - // Queue is active - if (!result.idle) { - // Notify and return - if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'active' + chalk.blue(' in the last ') + opt.idleFor + chalk.blue(' minutes.')) - return result - } - - // Queue is idle - if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.')) - if (opt.delete) { - const deleteResult = await deleteQueue(qname, qrl, opt) - const resultIncludingDelete = Object.assign(result, { - deleted: deleteResult.deleted, - apiCalls: { - SQS: result.apiCalls.SQS + deleteResult.apiCalls.SQS, - CloudWatch: result.apiCalls.CloudWatch + deleteResult.apiCalls.CloudWatch - } - }) - return resultIncludingDelete - } -} - /** * Processes a queue and its fail and delete queue, treating them as a unit. */ @@ -308,6 +279,14 @@ export async function processQueueSet (qname, qrl, opt) { return result } +// +// Strips failed and dlq suffix from a queue name or URL +// +export function stripSuffixes (queueName, opt) { + const suffixFinder = new RegExp(`(${opt.dlqSuffix}|${opt.failSuffix}){1}(|${fifoSuffix})$`) + return queueName.replace(suffixFinder, '$2') +} + // // Resolve queues for listening loop listen // @@ -322,17 +301,17 @@ export async function idleQueues (queues, options) { console.error() } - // Filter out any queue ending in suffix unless --include-failed is set + // Filter out failed and dead queues, but if we have an orphaned fail or + // dead queue, keep the original parent queue name so that orphans can be + // deleted. + const queueNames = new Set() const filteredEntries = entries.filter(entry => { - const suf = opt.failSuffix - const sufFifo = opt.failSuffix + fifoSuffix - const isFail = entry.qname.endsWith(suf) - const isFifoFail = entry.qname.endsWith(sufFifo) - const sufDead = opt.dlqSuffix - const sufFifoDead = opt.dlqSuffix + fifoSuffix - const isDead = entry.qname.endsWith(sufDead) - const isFifoDead = entry.qname.endsWith(sufFifoDead) - return opt.includeFailed ? true : (!isFail && !isFifoFail && !isDead && !isFifoDead) + const stripped = stripSuffixes(entry.qname, opt) + if (queueNames.has(stripped)) return false + queueNames.add(stripped) + entry.qname = stripped + entry.qrl = stripSuffixes(entry.qrl, opt) + return true }) // But only if we have queues to remove @@ -345,7 +324,6 @@ export async function idleQueues (queues, options) { console.error() } // Check each queue in parallel - if (opt.unpair) return Promise.all(filteredEntries.map(e => processQueue(e.qname, e.qrl, opt))) return Promise.all(filteredEntries.map(e => processQueueSet(e.qname, e.qrl, opt))) } diff --git a/src/monitor.js b/src/monitor.js index 1972991..3d9cda4 100644 --- a/src/monitor.js +++ b/src/monitor.js @@ -13,9 +13,10 @@ const debug = Debug('qdone:monitor') * Splits a queue name with a single wildcard into prefix and suffix regex. */ export async function monitor (queue, save, options) { + if (queue.endsWith('.fifo')) options.fifo = true const opt = getOptionsWithDefaults(options) const queueName = normalizeQueueName(queue, opt) - debug({ opt, queueName }) + debug({ options, opt, queue, queueName }) const data = await getAggregateData(queueName) console.log(data) if (save) { diff --git a/test/dedup.test.js b/test/dedup.test.js index df45001..c595dab 100644 --- a/test/dedup.test.js +++ b/test/dedup.test.js @@ -21,6 +21,8 @@ const options = { cachePrefix: 'qdone:' } +jest.retryTimes(3) + beforeEach(shutdownCache) afterEach(async () => { jest.restoreAllMocks()