Skip to content

Commit

Permalink
check in the finally debugged initial version of check command
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Mar 20, 2024
1 parent ab5dbb8 commit a080150
Show file tree
Hide file tree
Showing 4 changed files with 399 additions and 51 deletions.
216 changes: 216 additions & 0 deletions src/check.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import chalk from 'chalk'
import Debug from 'debug'
import {
GetQueueAttributesCommand,
SetQueueAttributesCommand,
QueueDoesNotExist,
RequestThrottled,
KmsThrottled
} from '@aws-sdk/client-sqs'

import {
qrlCacheGet,
normalizeQueueName,
normalizeFailQueueName,
normalizeDLQName,
getQnameUrlPairs
} from './qrlCache.js'
import { getSQSClient } from './sqs.js'
import {
getDLQParams,
getFailParams,
getQueueParams,
getOrCreateQueue,
getOrCreateFailQueue,
getOrCreateDLQ
} from './enqueue.js'
import { getOptionsWithDefaults } from './defaults.js'
import { ExponentialBackoff } from './exponentialBackoff.js'

const debug = Debug('qdone:check')

/**
* Loops through attributes, checking each and returning true if they match
*/
export function attributesMatch (current, desired, opt, indent = '') {
let match = true
for (const attribute in desired) {
if (current[attribute] !== desired[attribute]) {
if (opt.verbose) console.error(chalk.yellow(indent + 'Attribute mismatch: ') + attribute + chalk.yellow(' should be ') + desired[attribute] + chalk.yellow(' but is ') + current[attribute])
match = false
}
}
return match
}

/**
* Checks a DLQ, creating if the create option is set and modifying it if the
* overwrite option is set.
*/
export async function checkDLQ (queue, qrl, opt, indent = '') {
debug({ checkDLQ: { queue, qrl } })
const dqname = normalizeDLQName(queue, opt)
if (opt.verbose) console.error(chalk.blue(indent + 'checking ') + dqname)

// Check DLQ
let dqrl
try {
dqrl = await qrlCacheGet(dqname)
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
if (opt.verbose) console.error(chalk.red(indent + ' does not exist'))
if (opt.create) {
if (opt.verbose) console.error(chalk.green(indent + ' creating'))
dqrl = await getOrCreateDLQ(queue, opt)
} else {
return
}
}

// Check attributes
const { params: { Attributes: desired } } = getDLQParams(queue, opt)
const { Attributes: current } = await getQueueAttributes(dqrl)
if (attributesMatch(current, desired, opt, indent + ' ')) {
if (opt.verbose) console.error(chalk.green(indent + ' all good'))
} else {
if (opt.overwrite) {
if (opt.verbose) console.error(chalk.green(indent + ' modifying'))
return setQueueAttributes(dqrl, desired)
}
}
}

/**
* Checks a fail queue, creating if the create option is set and modifying it if the
* overwrite option is set.
*/
export async function checkFailQueue (queue, qrl, opt, indent = '') {
// Check dead first
await checkDLQ(queue, qrl, opt, indent)

// Check fail queue
const fqname = normalizeFailQueueName(queue, opt)
let fqrl
try {
fqrl = await qrlCacheGet(fqname)
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
if (opt.verbose) console.error(chalk.red(indent + ' does not exist'))
if (opt.create) {
if (opt.verbose) console.error(chalk.green(indent + ' creating'))
fqrl = await getOrCreateFailQueue(queue, opt)
} else {
return
}
}

try {
// Get fail queue params, creating fail queue if it doesn't exist and create flag is set
if (opt.verbose) console.error(chalk.blue(indent + 'checking ') + fqname)
const { params: { Attributes: desired } } = await getFailParams(queue, opt)
const { Attributes: current } = await getQueueAttributes(fqrl)
if (attributesMatch(current, desired, opt, indent + ' ')) {
if (opt.verbose) console.error(chalk.green(indent + ' all good'))
} else {
if (opt.overwrite) {
if (opt.verbose) console.error(chalk.green(indent + ' modifying'))
return setQueueAttributes(fqrl, desired)
}
}
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
if (opt.verbose) console.error(chalk.red(indent + ' missing dlq'))
}
}

/**
* Checks a queue, creating or modifying it if the create option is set
* and it needs it.
*/
export async function checkQueue (queue, qrl, opt, indent = '') {
const qname = normalizeQueueName(queue, opt)
if (opt.verbose) console.error(chalk.blue(indent + 'checking ') + qname)
await checkFailQueue(queue, qrl, opt, indent + ' ')
try {
const { params: { Attributes: desired } } = await getQueueParams(queue, opt)
const { Attributes: current, $metadata } = await getQueueAttributes(qrl)
debug({ current, $metadata })
if (attributesMatch(current, desired, opt, indent + ' ')) {
if (opt.verbose) console.error(chalk.green(indent + ' all good'))
} else {
if (opt.overwrite) {
if (opt.verbose) console.error(chalk.green(indent + ' modifying'))
return setQueueAttributes(qrl, desired)
}
}
} catch (e) {
if (!(e instanceof QueueDoesNotExist)) throw e
if (opt.verbose) console.error(chalk.red(indent + ' missing fail queue'))
}
}

export async function getQueueAttributes (qrl) {
debug('getQueueAttributes(', qrl, ')')
const client = getSQSClient()
const params = { AttributeNames: ['All'], QueueUrl: qrl }
const cmd = new GetQueueAttributesCommand(params)
// debug({ cmd })
const data = await client.send(cmd)
debug('GetQueueAttributes returned', data)
return data
}

export async function setQueueAttributes (qrl, attributes) {
debug('setQueueAttributes(', qrl, attributes, ')')
const client = getSQSClient()
const params = { Attributes: attributes, QueueUrl: qrl }
const cmd = new SetQueueAttributesCommand(params)
debug({ cmd })
const data = await client.send(cmd)
debug('SetQueueAttributes returned', data)
return data
}

// Retry happens within the context of the send functions
const retryableExceptions = [
RequestThrottled,
KmsThrottled,
QueueDoesNotExist // Queue could temporarily not exist due to eventual consistency, let it retry
]

//
// Enqueue a single command
// Returns a promise for the SQS API response.
//
export async function check (queues, options) {
debug('check(', { queues }, ')')
const opt = getOptionsWithDefaults(options)

// Start processing
if (opt.verbose) console.error(chalk.blue('Resolving queues: ') + queues.join(' '))
const qnames = queues.map(queue => normalizeQueueName(queue, opt))
const pairs = await getQnameUrlPairs(qnames, opt)

// Figure out which queues we want to listen on, choosing between active and
// all, filtering out failed queues if the user wants that
const selectedPairs = pairs
.filter(({ qname }) => qname)
.filter(({ qname }) => {
const suf = opt.failSuffix + (opt.fifo ? '.fifo' : '')
const deadSuf = opt.dlqSuffix + (opt.fifo ? '.fifo' : '')
const isFailQueue = qname.slice(-suf.length) === suf
const isDeadQueue = qname.slice(-deadSuf.length) === deadSuf
const isPlain = !isFailQueue && !isDeadQueue
const shouldInclude = isPlain || (isFailQueue && opt.includeFailed) || (isDeadQueue && opt.includeDead)
return shouldInclude
})

for (const { qname, qrl } of selectedPairs) {
debug({ qname, qrl })
await checkQueue(qname, qrl, opt)
}

debug({ pairs })
}

debug('loaded')
81 changes: 78 additions & 3 deletions src/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,23 @@ const enqueueOptionDefinitions = [
{ name: 'delay', type: Number, description: 'Delays delivery of the enqueued message by the given number of seconds (up to 900 seconds, or 15 minutes). Defaults to immediate delivery (no delay).' },
{ name: 'fail-delay', type: Number, description: 'Delays delivery of all messages on this queue by the given number of seconds (up to 900 seconds, or 15 minutes). Only takes effect if this queue is created during this enqueue operation. Defaults to immediate delivery (no delay).' },
{ name: 'dlq', type: Boolean, description: 'Send messages from the failed queue to a DLQ.' },
{ name: 'dql-suffix', type: String, description: `Suffix to append to each queue to generate DLQ name [default: ${defaults.dlqSuffix}]` },
{ name: 'dql-after', type: String, description: `Drives message to the DLQ after this many failures in the failed queue. [default: ${defaults.dlqAfter}]` },
{ name: 'dlq-suffix', type: String, description: `Suffix to append to each queue to generate DLQ name [default: ${defaults.dlqSuffix}]` },
{ name: 'dlq-after', type: String, description: `Drives message to the DLQ after this many failures in the failed queue. [default: ${defaults.dlqAfter}]` },
{ name: 'tag', type: String, multiple: true, description: 'Adds an AWS tag to queue creation. Use the format Key=Value. Can specify multiple times.' }
]

const checkOptionDefinitions = [
{ name: 'create', type: Boolean, description: 'Create queues that do not exist' },
{ name: 'overwrite', type: Boolean, description: 'Overwrite queue attributes that do not match expected' },
{ name: 'fifo', alias: 'f', type: Boolean, description: 'Create new queues as FIFOs' },
{ name: 'include-failed', type: Boolean, description: 'When using \'*\' do not ignore fail queues.' },
{ name: 'include-dead', type: Boolean, description: 'When using \'*\' do not ignore dead queues.' },
{ name: 'message-retention-period', type: Number, description: `Number of seconds to retain jobs (up to 14 days). [default: ${defaults.messageRetentionPeriod}]` },
{ name: 'delay', type: Number, description: 'Delays delivery of the enqueued message by the given number of seconds (up to 900 seconds, or 15 minutes). Defaults to immediate delivery (no delay).' },
{ name: 'fail-delay', type: Number, description: 'Delays delivery of all messages on this queue by the given number of seconds (up to 900 seconds, or 15 minutes). Only takes effect if this queue is created during this enqueue operation. Defaults to immediate delivery (no delay).' },
{ name: 'dlq', type: Boolean, description: 'Send messages from the failed queue to a DLQ.' },
{ name: 'dlq-suffix', type: String, description: `Suffix to append to each queue to generate DLQ name [default: ${defaults.dlqSuffix}]` },
{ name: 'dlq-after', type: String, description: `Drives message to the DLQ after this many failures in the failed queue. [default: ${defaults.dlqAfter}]` },
{ name: 'tag', type: String, multiple: true, description: 'Adds an AWS tag to queue creation. Use the format Key=Value. Can specify multiple times.' }
]

Expand Down Expand Up @@ -123,6 +138,64 @@ export async function enqueue (argv, testHook) {
return result
}

export async function check (argv, testHook) {
const optionDefinitions = [].concat(checkOptionDefinitions, globalOptionDefinitions)
const usageSections = [
{ content: 'usage: qdone check [options] <queue>', raw: true },
{ content: 'Options', raw: true },
{ optionList: optionDefinitions },
{ content: 'SQS API Call Complexity', raw: true, long: true },
{
content: [
{ count: '2 [ + 3 ]', summary: 'one call to resolve the queue name\none call to check the command\none extra calls if the queue does not match and --modify option is set' }
],
long: true
},
awsUsageHeader, awsUsageBody
]
debug('check argv', argv)

// Parse command and options
let queues, options
try {
options = commandLineArgs(optionDefinitions, { argv, partial: true })
setupVerbose(options)
debug('check options', options)
if (options.help) return Promise.resolve(console.log(getUsage(usageSections)))
if (!options._unknown || options._unknown.length === 0) throw new UsageError('check requres one or more <queue> arguments')
queues = options._unknown
debug('queues', queues)
} catch (err) {
console.log(getUsage(usageSections.filter(s => !s.long)))
throw err
}

// Process tags
if (options.tag && options.tag.length) {
options.tags = {}
for (const input of options.tag) {
debug({ input })
if (input.indexOf('=') === -1) throw new UsageError('Tags must be separated with the "=" character.')
const [key, ...rest] = input.split('=')
const value = rest.join('=')
debug({ input, key, rest, value, tags: options.tags })
options.tags[key] = value
}
}

// Load module after AWS global load
setupAWS(options)
const { check: checkOriginal } = await import('./check.js')
const check = testHook || checkOriginal

// Normal (non batch) enqueue
const opt = getOptionsWithDefaults(options)
const result = (
await withSentry(async () => check(queues, opt), opt)
)
return result
}

const monitorOptionDefinitions = [
{ name: 'save', alias: 's', type: Boolean, description: 'Saves data to CloudWatch' }
]
Expand Down Expand Up @@ -452,7 +525,7 @@ export async function idleQueues (argv, testHook) {
}

export async function root (originalArgv, testHook) {
const validCommands = [null, 'enqueue', 'enqueue-batch', 'worker', 'idle-queues', 'monitor']
const validCommands = [null, 'enqueue', 'enqueue-batch', 'worker', 'idle-queues', 'monitor', 'check']
const usageSections = [
{ content: 'qdone - Command line job queue for SQS', raw: true, long: true },
{ content: 'usage: qdone [options] <command>', raw: true },
Expand Down Expand Up @@ -505,6 +578,8 @@ export async function root (originalArgv, testHook) {
return idleQueues(argv, testHook)
} else if (command === 'monitor') {
return monitor(argv, testHook)
} else if (command === 'check') {
return check(argv, testHook)
}
}

Expand Down
13 changes: 10 additions & 3 deletions src/defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ export const defaults = Object.freeze({
// Idle Queues
idleFor: 60,
delete: false,
unpair: false
unpair: false,

// Check
create: false
})

function validateInteger (opt, name) {
Expand All @@ -69,7 +72,7 @@ export function getOptionsWithDefaults (options) {
if (!options) options = {}

// Activate DLQ if any option is set
const dlq = options.dlq || !!(options['dlq-suffix'] || options['dlq-after'] || options['dlq-name'])
const dlq = options.dlq || !!(options['dlq-suffix'] || options['dlq-after'] || options['dlq-name'] || options.dlqSuffix || options.dlqAfter || options.dlqName)

const opt = {
// Shared
Expand Down Expand Up @@ -117,7 +120,11 @@ export function getOptionsWithDefaults (options) {
// Idle Queues
idleFor: options.idleFor || options['idle-for'] || defaults.idleFor,
delete: options.delete || defaults.delete,
unpair: options.delete || defaults.unpair
unpair: options.delete || defaults.unpair,

// Check
create: options.create || defaults.create,
overwrite: options.overwrite || defaults.overwrite
}

// Setting this env here means we don't have to in AWS SDK constructors
Expand Down
Loading

0 comments on commit a080150

Please sign in to comment.