Skip to content

Commit

Permalink
fix monitor bug, delete orphaned idle queues, validate more strings
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Jul 10, 2024
1 parent 4e51985 commit 037479d
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 41 deletions.
11 changes: 11 additions & 0 deletions src/defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ export function validateInteger (opt, name) {
return parsed
}

export function validateQueueName (opt, name) {
if (typeof name !== 'string') throw new Error(`${name} must be a string.`)
if (!name.match(/^[a-z0-9-_]+$/i)) throw new Error(`${name} can contain only numbers, letters, hypens and underscores.`)
return name
}

export function validateMessageOptions (messageOptions) {
const validKeys = ['deduplicationId', 'groupId']
if (typeof messageOptions === 'object' &&
Expand Down Expand Up @@ -158,6 +164,11 @@ export function getOptionsWithDefaults (options) {
opt.maxMemoryPercent = validateInteger(opt, 'maxMemoryPercent')
opt.idleFor = validateInteger(opt, 'idleFor')

validateQueueName(opt, 'region')
validateQueueName(opt, 'prefix')
validateQueueName(opt, 'failSuffix')
validateQueueName(opt, 'dlqSuffix')

// Validate dedup args
if (opt.externalDedup && !opt.cacheUri) throw new Error('--external-dedup requires the --cache-uri argument')
if (opt.externalDedup && (opt.dedupPeriod < 1)) throw new Error('--external-dedup of redis requires a --dedup-period > 1 second')
Expand Down
58 changes: 18 additions & 40 deletions src/idleQueues.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,35 +186,6 @@ export async function deleteQueue (qname, qrl, opt) {
}
}

/**
* Processes a single queue, checking for idle, deleting if applicable.
*/
export async function processQueue (qname, qrl, opt) {
const result = await checkIdle(qname, qrl, opt)
debug(qname, result)

// Queue is active
if (!result.idle) {
// Notify and return
if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'active' + chalk.blue(' in the last ') + opt.idleFor + chalk.blue(' minutes.'))
return result
}

// Queue is idle
if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))
if (opt.delete) {
const deleteResult = await deleteQueue(qname, qrl, opt)
const resultIncludingDelete = Object.assign(result, {
deleted: deleteResult.deleted,
apiCalls: {
SQS: result.apiCalls.SQS + deleteResult.apiCalls.SQS,
CloudWatch: result.apiCalls.CloudWatch + deleteResult.apiCalls.CloudWatch
}
})
return resultIncludingDelete
}
}

/**
* Processes a queue and its fail and delete queue, treating them as a unit.
*/
Expand Down Expand Up @@ -308,6 +279,14 @@ export async function processQueueSet (qname, qrl, opt) {
return result
}

//
// Strips failed and dlq suffix from a queue name or URL
//
export function stripSuffixes (queueName, opt) {
const suffixFinder = new RegExp(`(${opt.dlqSuffix}|${opt.failSuffix}){1}(|${fifoSuffix})$`)
return queueName.replace(suffixFinder, '$2')

Check warning on line 287 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L285-L287

Added lines #L285 - L287 were not covered by tests
}

//
// Resolve queues for listening loop listen
//
Expand All @@ -322,17 +301,17 @@ export async function idleQueues (queues, options) {
console.error()
}

// Filter out any queue ending in suffix unless --include-failed is set
// Filter out failed and dead queues, but if we have an orphaned fail or
// dead queue, keep the original parent queue name so that orphans can be
// deleted.
const queueNames = new Set()

Check warning on line 307 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L307

Added line #L307 was not covered by tests
const filteredEntries = entries.filter(entry => {
const suf = opt.failSuffix
const sufFifo = opt.failSuffix + fifoSuffix
const isFail = entry.qname.endsWith(suf)
const isFifoFail = entry.qname.endsWith(sufFifo)
const sufDead = opt.dlqSuffix
const sufFifoDead = opt.dlqSuffix + fifoSuffix
const isDead = entry.qname.endsWith(sufDead)
const isFifoDead = entry.qname.endsWith(sufFifoDead)
return opt.includeFailed ? true : (!isFail && !isFifoFail && !isDead && !isFifoDead)
const stripped = stripSuffixes(entry.qname, opt)

Check warning on line 309 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L309

Added line #L309 was not covered by tests
if (queueNames.has(stripped)) return false
queueNames.add(stripped)
entry.qname = stripped
entry.qrl = stripSuffixes(entry.qrl, opt)
return true

Check warning on line 314 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L311-L314

Added lines #L311 - L314 were not covered by tests
})

// But only if we have queues to remove
Expand All @@ -345,7 +324,6 @@ export async function idleQueues (queues, options) {
console.error()
}
// Check each queue in parallel
if (opt.unpair) return Promise.all(filteredEntries.map(e => processQueue(e.qname, e.qrl, opt)))
return Promise.all(filteredEntries.map(e => processQueueSet(e.qname, e.qrl, opt)))
}

Expand Down
3 changes: 2 additions & 1 deletion src/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ const debug = Debug('qdone:monitor')
* Splits a queue name with a single wildcard into prefix and suffix regex.
*/
export async function monitor (queue, save, options) {
if (queue.endsWith('.fifo')) options.fifo = true
const opt = getOptionsWithDefaults(options)
const queueName = normalizeQueueName(queue, opt)
debug({ opt, queueName })
debug({ options, opt, queue, queueName })

Check warning on line 19 in src/monitor.js

View check run for this annotation

Codecov / codecov/patch

src/monitor.js#L19

Added line #L19 was not covered by tests
const data = await getAggregateData(queueName)
console.log(data)
if (save) {
Expand Down
2 changes: 2 additions & 0 deletions test/dedup.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const options = {
cachePrefix: 'qdone:'
}

jest.retryTimes(3)

beforeEach(shutdownCache)
afterEach(async () => {
jest.restoreAllMocks()
Expand Down

0 comments on commit 037479d

Please sign in to comment.