Skip to content

Commit

Permalink
Merge pull request #66 from suredone/delete-idle-queues-2
Browse files Browse the repository at this point in the history
Delete idle queues updates
  • Loading branch information
ryanwitt authored Jul 8, 2024
2 parents efb72b9 + 95703f4 commit bec4790
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 79 deletions.
2 changes: 1 addition & 1 deletion src/defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export function getOptionsWithDefaults (options) {
// Idle Queues
idleFor: options.idleFor || options['idle-for'] || defaults.idleFor,
delete: options.delete || defaults.delete,
unpair: options.delete || defaults.unpair,
unpair: options.unpair || defaults.unpair,

// Check
create: options.create || defaults.create,
Expand Down
170 changes: 92 additions & 78 deletions src/idleQueues.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { getCloudWatchClient } from './cloudWatch.js'
import { getOptionsWithDefaults } from './defaults.js'
import { GetQueueAttributesCommand, DeleteQueueCommand, QueueDoesNotExist } from '@aws-sdk/client-sqs'
import { GetMetricStatisticsCommand } from '@aws-sdk/client-cloudwatch'
import { normalizeFailQueueName, getQnameUrlPairs, fifoSuffix } from './qrlCache.js'
import { normalizeFailQueueName, normalizeDLQName, getQnameUrlPairs, fifoSuffix } from './qrlCache.js'
import { getCache, setCache } from './cache.js'
// const AWS = require('aws-sdk')

Expand Down Expand Up @@ -37,6 +37,7 @@ const metricNames = [
* Actual SQS call, used in conjunction with cache.
*/
export async function _cheapIdleCheck (qname, qrl, opt) {
debug('_cheapIdleCheck', qname, qrl)
try {
const client = getSQSClient()
const cmd = new GetQueueAttributesCommand({ AttributeNames: attributeNames, QueueUrl: qrl })
Expand All @@ -46,11 +47,13 @@ export async function _cheapIdleCheck (qname, qrl, opt) {
result.queue = qname.slice(opt.prefix.length)
// We are idle if all the messages attributes are zero
result.idle = attributeNames.filter(k => result[k] === '0').length === attributeNames.length
result.exists = true
debug({ result, SQS: 1 })
return { result, SQS: 1 }
} catch (e) {
debug({ _cheapIdleCheck: e })

Check warning on line 54 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L54

Added line #L54 was not covered by tests
if (e instanceof QueueDoesNotExist) {
// Count deleted queues as idle
return { result: { idle: true }, SQS: 1 }
return { result: { idle: undefined, exists: false }, SQS: 1 }
} else {
throw e

Check warning on line 58 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L56-L58

Added lines #L56 - L58 were not covered by tests
}
Expand All @@ -62,6 +65,7 @@ export async function _cheapIdleCheck (qname, qrl, opt) {
* at this immediate moment.
*/
export async function cheapIdleCheck (qname, qrl, opt) {
debug('cheapIdleCheck', qname, qrl)
// Just call the API if we don't have a cache
if (!opt.cacheUri) return _cheapIdleCheck(qname, qrl, opt)

Expand Down Expand Up @@ -125,19 +129,20 @@ export async function checkIdle (qname, qrl, opt) {
const { result: cheapResult, SQS } = await cheapIdleCheck(qname, qrl, opt)
debug('cheapResult', cheapResult)

Check warning on line 130 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L129-L130

Added lines #L129 - L130 were not covered by tests

// Short circuit further calls if cheap result shows data
if (cheapResult.idle === false) {
// Short circuit further calls if cheap result is conclusive
if (cheapResult.idle === false || cheapResult.exists === false) {
return {

Check warning on line 134 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L134

Added line #L134 was not covered by tests
queue: qname.slice(opt.prefix.length),
cheap: cheapResult,
idle: false,
idle: cheapResult.idle,
exists: cheapResult.exists,
apiCalls: { SQS, CloudWatch: 0 }
}
}

// If we get here, there's nothing in the queue at the moment,
// so we have to check metrics one at a time
const apiCalls = { SQS: 1, CloudWatch: 0 }
const apiCalls = { SQS, CloudWatch: 0 }
const results = []
let idle = true
for (const metricName of metricNames) {

Check warning on line 148 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L145-L148

Added lines #L145 - L148 were not covered by tests
Expand All @@ -158,7 +163,8 @@ export async function checkIdle (qname, qrl, opt) {
queue: qname.slice(opt.prefix.length),
cheap: cheapResult,
apiCalls,
idle
idle,
exists: true
},
...results // merge in results from CloudWatch
)
Expand Down Expand Up @@ -210,92 +216,96 @@ export async function processQueue (qname, qrl, opt) {
}

/**
* Processes a queue and its fail queue, treating them as a unit.
* Processes a queue and its fail and delete queue, treating them as a unit.
*/
export async function processQueuePair (qname, qrl, opt) {
export async function processQueueSet (qname, qrl, opt) {

Check warning on line 221 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L221

Added line #L221 was not covered by tests
const isFifo = qname.endsWith('.fifo')
const normalizeOptions = Object.assign({}, opt, { fifo: isFifo })

Check warning on line 223 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L223

Added line #L223 was not covered by tests

// Generate DLQ name/url
const dqname = normalizeDLQName(qname, normalizeOptions)
const dqrl = normalizeDLQName(dqname, normalizeOptions)

Check warning on line 227 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L226-L227

Added lines #L226 - L227 were not covered by tests

// Generate fail queue name/url
const fqname = normalizeFailQueueName(qname, normalizeOptions)
const fqrl = normalizeFailQueueName(fqname, normalizeOptions)

Check warning on line 231 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L230-L231

Added lines #L230 - L231 were not covered by tests

// Idle check
const result = await checkIdle(qname, qrl, opt)
debug('result', result)

// Queue is active
const active = !result.idle
if (active) {
if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'active' + chalk.blue(' in the last ') + opt.idleFor + chalk.blue(' minutes.'))
return result
}
debug({ qname, qrl, dqname, dqrl, fqname, fqrl })

Check warning on line 233 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L233

Added line #L233 was not covered by tests

// Queue is idle
if (opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))
// Idle check
const qresult = await checkIdle(qname, qrl, opt)
const fqresult = await checkIdle(fqname, fqrl, opt)
const dqresult = await checkIdle(dqname, dqrl, opt)
debug({ qresult, fqresult, dqresult })

Check warning on line 239 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L236-L239

Added lines #L236 - L239 were not covered by tests

// Check fail queue
try {
const fresult = await checkIdle(fqname, fqrl, opt)
debug('fresult', fresult)
const idleCheckResult = Object.assign(
result,
{ idle: result.idle && fresult.idle, failq: fresult },
{
apiCalls: {
SQS: result.apiCalls.SQS + fresult.apiCalls.SQS,
CloudWatch: result.apiCalls.CloudWatch + fresult.apiCalls.CloudWatch
}
// Start building return value
const result = Object.assign(

Check warning on line 242 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L242

Added line #L242 was not covered by tests
{
queue: qname,
idle: (
qresult.idle &&
(!fqresult.exists || fqresult.idle) &&
(!fqresult.exists || dqresult.idle)

Check warning on line 248 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L246-L248

Added lines #L246 - L248 were not covered by tests
),
apiCalls: {
SQS: qresult.apiCalls.SQS + fqresult.apiCalls.SQS + dqresult.apiCalls.SQS,
CloudWatch: qresult.apiCalls.CloudWatch + fqresult.apiCalls.CloudWatch + dqresult.apiCalls.CloudWatch
}
)

// Queue is active
const factive = !fresult.idle
if (factive) {
if (opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'active' + chalk.blue(' in the last ') + opt.idleFor + chalk.blue(' minutes.'))
return idleCheckResult
}
)

// Queue is idle
if (opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))
// Queue is idle
if (qresult.idle && opt.verbose) console.error(chalk.blue('Queue ') + qname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))
if (fqresult.idle && opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))
if (dqresult.idle && opt.verbose) console.error(chalk.blue('Queue ') + dqname.slice(opt.prefix.length) + chalk.blue(' has been ') + 'idle' + chalk.blue(' for the last ') + opt.idleFor + chalk.blue(' minutes.'))

// Trigger a delete if the user wants it
if (!opt.delete) return idleCheckResult
const [dresult, dfresult] = await Promise.all([
deleteQueue(qname, qrl, opt),
deleteQueue(fqname, fqrl, opt)
])
return Object.assign(idleCheckResult, {
apiCalls: {
// Sum the SQS calls across all four
SQS: [result, fresult, dresult, dfresult]
.map(r => r.apiCalls.SQS)
.reduce((a, b) => a + b, 0),
// Sum the CloudWatch calls across all four
CloudWatch: [result, fresult, dresult, dfresult]
.map(r => r.apiCalls.CloudWatch)
.reduce((a, b) => a + b, 0)
// Delete if all are idle
const canDelete = (
(qresult.idle || qresult.exists === false) &&
(fqresult.idle || fqresult.exists === false) &&
(dqresult.idle || dqresult.exists === false)

Check warning on line 266 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L265-L266

Added lines #L265 - L266 were not covered by tests
)
debug({ canDelete })

Check warning on line 268 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L268

Added line #L268 was not covered by tests

if (opt.delete && canDelete) {
// Normal
const qdresult = await (async () => {
debug({ qresult })
try {

Check warning on line 274 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L272-L274

Added lines #L272 - L274 were not covered by tests
if (qresult.idle) return deleteQueue(qname, qrl, opt)
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
}
})
} catch (e) {
// Handle the case where the fail queue has been deleted or was never
// created for some reason
if (!(e instanceof QueueDoesNotExist)) throw e
})()
if (qdresult) { result.apiCalls.SQS += qdresult.apiCalls.SQS }
debug({ qdresult })

Check warning on line 281 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L281

Added line #L281 was not covered by tests

// Fail queue doesn't exist if we get here
if (opt.verbose) console.error(chalk.blue('Queue ') + fqname.slice(opt.prefix.length) + chalk.blue(' does not exist.'))
// Fail
const fqdresult = await (async () => {
debug({ fqresult })
try {

Check warning on line 286 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L284-L286

Added lines #L284 - L286 were not covered by tests
if (fqresult.idle) return deleteQueue(fqname, fqrl, opt)
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
}
})()
if (fqdresult) { result.apiCalls.SQS += fqdresult.apiCalls.SQS }
debug({ fqdresult })

Check warning on line 293 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L293

Added line #L293 was not covered by tests

// Handle delete
if (!opt.delete) return result
const deleteResult = await deleteQueue(qname, qrl, opt)
const resultIncludingDelete = Object.assign(result, {
deleted: deleteResult.deleted,
apiCalls: {
SQS: result.apiCalls.SQS + deleteResult.apiCalls.SQS,
CloudWatch: result.apiCalls.CloudWatch + deleteResult.apiCalls.CloudWatch
// Dead
const dqdresult = await (async () => {
debug({ dqresult })
try {

Check warning on line 298 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L296-L298

Added lines #L296 - L298 were not covered by tests
if (dqresult.idle) return deleteQueue(dqname, dqrl, opt)
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
}
})
return resultIncludingDelete
})()
if (dqdresult) { result.apiCalls.SQS += dqdresult.apiCalls.SQS }
debug({ dqdresult })

Check warning on line 305 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L305

Added line #L305 was not covered by tests
}

return result

Check warning on line 308 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L308

Added line #L308 was not covered by tests
}

//
Expand All @@ -318,7 +328,11 @@ export async function idleQueues (queues, options) {
const sufFifo = opt.failSuffix + fifoSuffix
const isFail = entry.qname.endsWith(suf)
const isFifoFail = entry.qname.endsWith(sufFifo)
return opt.includeFailed ? true : (!isFail && !isFifoFail)
const sufDead = opt.dlqSuffix
const sufFifoDead = opt.dlqSuffix + fifoSuffix
const isDead = entry.qname.endsWith(sufDead)
const isFifoDead = entry.qname.endsWith(sufFifoDead)

Check warning on line 334 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L326-L334

Added lines #L326 - L334 were not covered by tests
return opt.includeFailed ? true : (!isFail && !isFifoFail && !isDead && !isFifoDead)
})

// But only if we have queues to remove
Expand All @@ -332,7 +346,7 @@ export async function idleQueues (queues, options) {
}
// Check each queue in parallel
if (opt.unpair) return Promise.all(filteredEntries.map(e => processQueue(e.qname, e.qrl, opt)))
return Promise.all(filteredEntries.map(e => processQueuePair(e.qname, e.qrl, opt)))
return Promise.all(filteredEntries.map(e => processQueueSet(e.qname, e.qrl, opt)))

Check warning on line 349 in src/idleQueues.js

View check run for this annotation

Codecov / codecov/patch

src/idleQueues.js#L349

Added line #L349 was not covered by tests
}

// Otherwise, let caller know
Expand Down
2 changes: 2 additions & 0 deletions test/idleQueues.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ describe('_cheapIdleCheck', () => {
ApproximateNumberOfMessages: '1',
ApproximateNumberOfMessagesNotVisible: '0',
idle: false,
exists: true,
queue: qname
}
})
Expand Down Expand Up @@ -79,6 +80,7 @@ describe('cheapIdleCheck', () => {
ApproximateNumberOfMessages: '1',
ApproximateNumberOfMessagesNotVisible: '0',
idle: false,
exists: true,
queue: qname
}
})
Expand Down

0 comments on commit bec4790

Please sign in to comment.