Skip to content

Commit

Permalink
bugfix and test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Sep 8, 2024
1 parent 68d3262 commit d82b417
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 15 deletions.
10 changes: 2 additions & 8 deletions src/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -451,19 +451,14 @@ export async function idleQueues (argv, testHook) {
{
content: [
{ count: '1 + q + i', desc: 'q: number of queues in pattern\ni: number of idle queues' },
{ context: 'with --delete options', count: '1 + q + 3i', desc: 'q: number of queues in pattern\ni: number of idle queues' },
{ context: 'with --unpair option', count: '1 + q', desc: 'q: number of queues in pattern' },
{ context: 'with --unpair and --delete options', count: '1 + q + i', desc: 'q: number of queues in pattern\ni: number of idle queues' },
{ desc: 'NOTE: the --unpair option not cheaper if you include fail queues, because it doubles q.' }
{ context: 'with --delete options', count: '1 + q + 3i', desc: 'q: number of queues in pattern\ni: number of idle queues' }
],
long: true
},
{ content: 'CloudWatch API Call Complexity', raw: true, long: true },
{
content: [
{ count: 'min: 0 (if queue and fail queue have waiting messages)\nmax: 12q\nexpected (approximate observed): 0.5q + 12i', desc: 'q: number of queues in pattern\ni: number of idle queues' },
{ context: 'with --unpair option', count: 'min: 0 (if queue has waiting messages)\nmax: 6q\nexpected (approximate observed): q + 6i', desc: 'q: number of queues in pattern\ni: number of idle queues' },
{ desc: 'NOTE: the --unpair option not cheaper if you include fail queues, because it doubles q.' }
{ count: 'min: 0 (if queue and fail queue have waiting messages)\nmax: 12q\nexpected (approximate observed): 0.5q + 12i', desc: 'q: number of queues in pattern\ni: number of idle queues' }
],
long: true
},
Expand All @@ -479,7 +474,6 @@ export async function idleQueues (argv, testHook) {
debug('idleQueues options', options)
if (options.help) return Promise.resolve(console.log(getUsage(usageSections)))
if (!options._unknown || options._unknown.length === 0) throw new UsageError('idle-queues requres one or more <queue> arguments')
if (options['include-failed'] && !options.unpair) throw new UsageError('--include-failed should be used with --unpair')
if (options['idle-for'] < 5) throw new UsageError('--idle-for must be at least 5 minutes (CloudWatch limitation)')
queues = options._unknown
debug('queues', queues)
Expand Down
2 changes: 1 addition & 1 deletion src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export async function processMessages (queues, callback, options) {

if (!shutdownRequested) {
if (messages.length) {
jobExecutor.executeJobs(messages, callback, qname, qrl)
await jobExecutor.executeJobs(messages, callback, qname, qrl)
queueManager.updateIcehouse(qrl, false)
} else {
// If we didn't get any, update the icehouse so we can back off
Expand Down
4 changes: 2 additions & 2 deletions src/idleQueues.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { getCloudWatchClient } from './cloudWatch.js'
import { getOptionsWithDefaults } from './defaults.js'
import { GetQueueAttributesCommand, DeleteQueueCommand, QueueDoesNotExist } from '@aws-sdk/client-sqs'
import { GetMetricStatisticsCommand } from '@aws-sdk/client-cloudwatch'
import { normalizeFailQueueName, normalizeDLQName, getQnameUrlPairs, fifoSuffix } from './qrlCache.js'
import { normalizeFailQueueName, normalizeDLQName, getQnameUrlPairs, fifoSuffix, qrlCacheSet } from './qrlCache.js'
import { getCache, setCache } from './cache.js'
// const AWS = require('aws-sdk')

Expand Down Expand Up @@ -133,7 +133,7 @@ export async function checkIdle (qname, qrl, opt) {
if (cheapResult.idle === false || cheapResult.exists === false) {
return {
queue: qname.slice(opt.prefix.length),
cheap: cheapResult,
cheap: { SQS, result: cheapResult },
idle: cheapResult.idle,
exists: cheapResult.exists,
apiCalls: { SQS, CloudWatch: 0 }
Expand Down
4 changes: 3 additions & 1 deletion src/scheduler/jobExecutor.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ export class JobExecutor {
// Begin tracking jobs
const jobs = messages.map(message => this.addJob(message, callback, qname, qrl))
const isFifo = qrl.endsWith('.fifo')
const runningJobs = []

// console.log(jobs)

Expand All @@ -385,7 +386,8 @@ export class JobExecutor {
// console.log({ i, nextJobAtt: nextJob?.message?.Attributes, nextJobIsSerial })
// Execute serial or parallel
if (nextJobIsSerial) await this.runJob(job)
else this.runJob(job)
else runningJobs.push(this.runJob(job))
}
await Promise.all(runningJobs)
}
}
Loading

0 comments on commit d82b417

Please sign in to comment.