Skip to content

Commit

Permalink
long time to get that working
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Jul 8, 2024
1 parent 7b1aae5 commit 95703f4
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export function getOptionsWithDefaults (options) {
// Idle Queues
idleFor: options.idleFor || options['idle-for'] || defaults.idleFor,
delete: options.delete || defaults.delete,
unpair: options.delete || defaults.unpair,
unpair: options.unpair || defaults.unpair,

// Check
create: options.create || defaults.create,
Expand Down
181 changes: 90 additions & 91 deletions src/idleQueues.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const metricNames = [
* Actual SQS call, used in conjunction with cache.
*/
export async function _cheapIdleCheck (qname, qrl, opt) {
debug('_cheapIdleCheck', qname, qrl)
try {
const client = getSQSClient()
const cmd = new GetQueueAttributesCommand({ AttributeNames: attributeNames, QueueUrl: qrl })
Expand All @@ -46,11 +47,13 @@ export async function _cheapIdleCheck (qname, qrl, opt) {
result.queue = qname.slice(opt.prefix.length)
// We are idle if all the messages attributes are zero
result.idle = attributeNames.filter(k => result[k] === '0').length === attributeNames.length
result.exists = true
debug({ result, SQS: 1 })
return { result, SQS: 1 }
} catch (e) {
debug({ _cheapIdleCheck: e })
if (e instanceof QueueDoesNotExist) {
// Count deleted queues as idle
return { result: { idle: true }, SQS: 1 }
return { result: { idle: undefined, exists: false }, SQS: 1 }
} else {
throw e
}
Expand All @@ -62,6 +65,7 @@ export async function _cheapIdleCheck (qname, qrl, opt) {
* at this immediate moment.
*/
export async function cheapIdleCheck (qname, qrl, opt) {
debug('cheapIdleCheck', qname, qrl)
// Just call the API if we don't have a cache
if (!opt.cacheUri) return _cheapIdleCheck(qname, qrl, opt)

Expand Down Expand Up @@ -125,19 +129,20 @@ export async function checkIdle (qname, qrl, opt) {
const { result: cheapResult, SQS } = await cheapIdleCheck(qname, qrl, opt)
debug('cheapResult', cheapResult)

// Short circuit further calls if cheap result shows data
if (cheapResult.idle === false) {
// Short circuit further calls if cheap result is conclusive
if (cheapResult.idle === false || cheapResult.exists === false) {
return {
queue: qname.slice(opt.prefix.length),
cheap: cheapResult,
idle: false,
idle: cheapResult.idle,
exists: cheapResult.exists,
apiCalls: { SQS, CloudWatch: 0 }
}
}

// If we get here, there's nothing in the queue at the moment,
// so we have to check metrics one at a time
const apiCalls = { SQS: 1, CloudWatch: 0 }
const apiCalls = { SQS, CloudWatch: 0 }
const results = []
let idle = true
for (const metricName of metricNames) {
Expand All @@ -158,7 +163,8 @@ export async function checkIdle (qname, qrl, opt) {
queue: qname.slice(opt.prefix.length),
cheap: cheapResult,
apiCalls,
idle
idle,
exists: true
},
...results // merge in results from CloudWatch
)
Expand Down Expand Up @@ -210,107 +216,96 @@ export async function processQueue (qname, qrl, opt) {
}

/**
* Processes a queue and its fail queue, treating them as a unit.
* Processes a queue and its fail and delete queue, treating them as a unit.
*/
export async function processQueuePair (qname, qrl, opt) {
export async function processQueueSet (qname, qrl, opt) {
const isFifo = qname.endsWith('.fifo')
const normalizeOptions = Object.assign({}, opt, { fifo: isFifo })

// Generate DLQ name/url
const dqname = normalizeDLQName(qname, normalizeOptions)
const dqrl = normalizeDLQName(dqname, normalizeOptions)

// Generate fail queue name/url
const fqname = normalizeFailQueueName(qname, normalizeOptions)
const fqrl = normalizeFailQueueName(fqname, normalizeOptions)
// Generate DLQ name/url
const dqname = normalizeDLQName(qname, normalizeOptions)
const dqrl = normalizeDLQName(fqname, normalizeOptions)

// Idle check
const result = await checkIdle(qname, qrl, opt)
debug('result', result)
debug({ qname, qrl, dqname, dqrl, fqname, fqrl })

// Queue is active
const active = !result.idle
if (active) {
if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'active' + chalk.blue(' in the last ') + opt.idleFor + chalk.blue(' minutes.'))
return result
}

// Queue is idle
if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))
// Idle check
const qresult = await checkIdle(qname, qrl, opt)
const fqresult = await checkIdle(fqname, fqrl, opt)
const dqresult = await checkIdle(dqname, dqrl, opt)
debug({ qresult, fqresult, dqresult })

// Check fail queue
try {
const fresult = await checkIdle(fqname, fqrl, opt)
debug('fresult', fresult)
const idleCheckResult = Object.assign(
result,
{ idle: result.idle && fresult.idle, failq: fresult },
{
apiCalls: {
SQS: result.apiCalls.SQS + fresult.apiCalls.SQS,
CloudWatch: result.apiCalls.CloudWatch + fresult.apiCalls.CloudWatch
}
// Start building return value
const result = Object.assign(
{
queue: qname,
idle: (
qresult.idle &&
(!fqresult.exists || fqresult.idle) &&
(!fqresult.exists || dqresult.idle)
),
apiCalls: {
SQS: qresult.apiCalls.SQS + fqresult.apiCalls.SQS + dqresult.apiCalls.SQS,
CloudWatch: qresult.apiCalls.CloudWatch + fqresult.apiCalls.CloudWatch + dqresult.apiCalls.CloudWatch
}
)

// Queue is active
const factive = !fresult.idle
if (factive) {
if (opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'active' + chalk.blue(' in the last ') + opt.idleFor + chalk.blue(' minutes.'))
return idleCheckResult
}
)

// Queue is idle
if (opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))
// Queue is idle
if (qresult.idle && opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))
if (fqresult.idle && opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))
if (dqresult.idle && opt.verbose) console.error(chalk.blue('Queue ') + dqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))

// Trigger a delete if the user wants it
if (!opt.delete) return idleCheckResult
// Delete if all are idle
const canDelete = (
(qresult.idle || qresult.exists === false) &&
(fqresult.idle || fqresult.exists === false) &&
(dqresult.idle || dqresult.exists === false)
)
debug({ canDelete })

let dresult, dfresult
try {
dresult = await deleteQueue(qname, qrl, opt)
} catch (e) {
// Ignore queues that don't exist already in case that: 1) there was a double
// call or 2) the SQS node that got the request is not consistent yet
if (!(e instanceof QueueDoesNotExist)) throw e
}
try {
dfresult = await deleteQueue(fqname, fqrl, opt)
} catch (e) {
// Ignore queues that don't exist already in case that: 1) there was a double
// call or 2) the SQS node that got the request is not consistent yet
if (!(e instanceof QueueDoesNotExist)) throw e
}
return Object.assign(idleCheckResult, {
apiCalls: {
// Sum the SQS calls across all four
SQS: [result, fresult, dresult, dfresult]
.map(r => r.apiCalls.SQS)
.reduce((a, b) => a + b, 0),
// Sum the CloudWatch calls across all four
CloudWatch: [result, fresult, dresult, dfresult]
.map(r => r.apiCalls.CloudWatch)
.reduce((a, b) => a + b, 0)
if (opt.delete && canDelete) {
// Normal
const qdresult = await (async () => {
debug({ qresult })
try {
if (qresult.idle) return deleteQueue(qname, qrl, opt)
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
}
})
} catch (e) {
// Handle the case where the fail queue has been deleted or was never
// created for some reason
if (!(e instanceof QueueDoesNotExist)) throw e
})()
if (qdresult) { result.apiCalls.SQS += qdresult.apiCalls.SQS }
debug({ qdresult })

// Fail queue doesn't exist if we get here
if (opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' does not exist.'))
// Fail
const fqdresult = await (async () => {
debug({ fqresult })
try {
if (fqresult.idle) return deleteQueue(fqname, fqrl, opt)
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
}
})()
if (fqdresult) { result.apiCalls.SQS += fqdresult.apiCalls.SQS }
debug({ fqdresult })

// Handle delete
if (!opt.delete) return result
const deleteResult = await deleteQueue(qname, qrl, opt)
const resultIncludingDelete = Object.assign(result, {
deleted: deleteResult.deleted,
apiCalls: {
SQS: result.apiCalls.SQS + deleteResult.apiCalls.SQS,
CloudWatch: result.apiCalls.CloudWatch + deleteResult.apiCalls.CloudWatch
// Dead
const dqdresult = await (async () => {
debug({ dqresult })
try {
if (dqresult.idle) return deleteQueue(dqname, dqrl, opt)
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
}
})
return resultIncludingDelete
})()
if (dqdresult) { result.apiCalls.SQS += dqdresult.apiCalls.SQS }
debug({ dqdresult })
}

return result
}

//
Expand All @@ -333,7 +328,11 @@ export async function idleQueues (queues, options) {
const sufFifo = opt.failSuffix + fifoSuffix
const isFail = entry.qname.endsWith(suf)
const isFifoFail = entry.qname.endsWith(sufFifo)
return opt.includeFailed ? true : (!isFail && !isFifoFail)
const sufDead = opt.dlqSuffix
const sufFifoDead = opt.dlqSuffix + fifoSuffix
const isDead = entry.qname.endsWith(sufDead)
const isFifoDead = entry.qname.endsWith(sufFifoDead)
return opt.includeFailed ? true : (!isFail && !isFifoFail && !isDead && !isFifoDead)
})

// But only if we have queues to remove
Expand All @@ -347,7 +346,7 @@ export async function idleQueues (queues, options) {
}
// Check each queue in parallel
if (opt.unpair) return Promise.all(filteredEntries.map(e => processQueue(e.qname, e.qrl, opt)))
return Promise.all(filteredEntries.map(e => processQueuePair(e.qname, e.qrl, opt)))
return Promise.all(filteredEntries.map(e => processQueueSet(e.qname, e.qrl, opt)))
}

// Otherwise, let caller know
Expand Down
2 changes: 2 additions & 0 deletions test/idleQueues.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ describe('_cheapIdleCheck', () => {
ApproximateNumberOfMessages: '1',
ApproximateNumberOfMessagesNotVisible: '0',
idle: false,
exists: true,
queue: qname
}
})
Expand Down Expand Up @@ -79,6 +80,7 @@ describe('cheapIdleCheck', () => {
ApproximateNumberOfMessages: '1',
ApproximateNumberOfMessagesNotVisible: '0',
idle: false,
exists: true,
queue: qname
}
})
Expand Down

0 comments on commit 95703f4

Please sign in to comment.