Skip to content

Commit

Permalink
implement --tag option
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Dec 21, 2023
1 parent 236d0cc commit d0a2a33
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 92 deletions.
49 changes: 49 additions & 0 deletions examples/simpleScheduler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env -S node --experimental-json-modules

//
// This example implements a basic scheduler on top of qdone. The
// processMessages callback looks at the queue name throws a DoNotProcess
// error if there are already two messages processing on this queue.
//

import {
enqueue,
processMessages,
requestShutdown,
DoNotProcess
} from '../index.mjs' // from 'qdone' for standalone example

const randomEnqueue = setTimeout(function () {
const queue = ['rtest1', 'rtest2', 'rtest3'][Math.round(Math.random()*2)]
const payload = {foo: Math.round(Math.random() * 10)}
console.log({ enqueue: { queue, payload } })
enqueue(queue, JSON.stringify(payload))
}, 200)

process.on('SIGINT', () => { clearInterval(randomEnqueue); console.log('SIGINT'); requestShutdown() })
process.on('SIGTERM', () => { clearInterval(randomEnqueue); console.log('SIGTERM'); requestShutdown() })

// This returns a promise that resolves in the given number of milliseconds
const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms))

// This keeps track of the number of active jobs per queue
const activeCount = {}

async function callback (queue, payload) {
const numActive = activeCount[queue] = (activeCount[queue] || 0) + 1
console.log({ numActive, activeCount })
try {
// Limit to 2 active tasks per queue
if (numActive > 2) {
console.log({ refusing: { queue, payload } })
throw new DoNotProcess()
}

console.log({ processing: { queue, payload } })
await delay(Math.random() * 1000 * 2) // Processing takes up to 2 seconds
} finally {
activeCount[queue] = (activeCount[queue] || 0) - 1
}
}

await processMessages(['rtest1', 'rtest2', 'rtest3'], callback, { verbose: true, disableLog: true })
20 changes: 10 additions & 10 deletions src/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ let client
* Internal function to setup redis client. Parses URI to figure out
* how to connect.
*/
export function getCacheClient (options) {
export function getCacheClient (opt) {
if (client) {
return client
} else if (options['cache-uri']) {
const url = new URL(options['cache-uri'])
} else if (opt.cacheUri) {
const url = new URL(opt.cacheUri)
if (url.protocol === 'redis:') {
client = new Redis(url.toString())
} else if (url.protocol === 'redis-cluster:') {
Expand All @@ -39,9 +39,9 @@ export function shutdownCache () {
* Returns a promise for the item. Resolves to false if cache is empty, object
* if it is found.
*/
export async function getCache (key, options) {
const client = getCacheClient(options)
const cacheKey = options['cache-prefix'] + key
export async function getCache (key, opt) {
const client = getCacheClient(opt)
const cacheKey = opt.cachePrefix + key
debug({ action: 'getCache', cacheKey })
const result = await client.get(cacheKey)
debug({ action: 'getCache got', cacheKey, result })
Expand All @@ -51,12 +51,12 @@ export async function getCache (key, options) {
/**
* Returns a promise for the status. Encodes object as JSON
*/
export async function setCache (key, value, options) {
const client = getCacheClient(options)
export async function setCache (key, value, opt) {
const client = getCacheClient(opt)
const encoded = JSON.stringify(value)
const cacheKey = options['cache-prefix'] + key
const cacheKey = opt.cachePrefix + key
debug({ action: 'setCache', cacheKey, value })
return client.setex(cacheKey, options['cache-ttl-seconds'], encoded)
return client.setex(cacheKey, opt.cacheTtlSeconds, encoded)
}

debug('loaded')
57 changes: 37 additions & 20 deletions src/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import commandLineArgs from 'command-line-args'
import Debug from 'debug'
import chalk from 'chalk'

import { QueueDoesNotExist } from '@aws-sdk/client-sqs'
import { defaults, setupAWS, setupVerbose, getOptionsWithDefaults } from './defaults.js'
import { shutdownCache } from './cache.js'
import { withSentry } from './sentry.js'
Expand Down Expand Up @@ -53,7 +54,8 @@ const enqueueOptionDefinitions = [
{ name: 'delay', alias: 'd', type: Number, description: 'Delays delivery of each message by the given number of seconds (up to 900 seconds, or 15 minutes). Defaults to immediate delivery (no delay).' },
{ name: 'dlq', type: Boolean, description: 'Send messages from the failed queue to a DLQ.' },
{ name: 'dql-suffix', type: String, description: `Suffix to append to each queue to generate DLQ name [default: ${defaults.dlqSuffix}]` },
{ name: 'dql-after', type: String, description: `Drives message to the DLQ after this many failures in the failed queue. [default: ${defaults.dlqAfter}]` }
{ name: 'dql-after', type: String, description: `Drives message to the DLQ after this many failures in the failed queue. [default: ${defaults.dlqAfter}]` },
{ name: 'tag', type: String, multiple: true, description: 'Adds an AWS tag to queue creation. Use the format Key=Value. Can specify multiple times.' }
]

export async function enqueue (argv, testHook) {
Expand Down Expand Up @@ -89,6 +91,19 @@ export async function enqueue (argv, testHook) {
throw err
}

// Process tags
if (options.tag && options.tag.length) {
options.tags = {}
for (const input of options.tag) {
debug({ input })
if (input.indexOf('=') === -1) throw new UsageError('Tags must be separated with the "=" character.')
const [key, ...rest] = input.split('=')
const value = rest.join('=')
debug({ input, key, rest, value, tags: options.tags })
options.tags[key] = value
}
}

// Load module after AWS global load
setupAWS(options)
const { enqueue: enqueueOriginal } = await import('./enqueue.js')
Expand Down Expand Up @@ -352,7 +367,7 @@ export async function worker (argv, testHook) {

export async function idleQueues (argv, testHook) {
const optionDefinitions = [
{ name: 'idle-for', alias: 'o', type: Number, defaultValue: 60, description: 'Minutes of inactivity after which a queue is considered idle. [default: 60]' },
{ name: 'idle-for', alias: 'o', type: Number, defaultValue: defaults.idleFor, description: `Minutes of inactivity after which a queue is considered idle. [default: ${defaults.idleFor}]` },
{ name: 'delete', type: Boolean, description: 'Delete the queue if it is idle. The fail queue also must be idle unless you use --unpair.' },
{ name: 'unpair', type: Boolean, description: 'Treat queues and their fail queues as independent. By default they are treated as a unit.' },
{ name: 'include-failed', type: Boolean, description: 'When using \'*\' do not ignore fail queues. This option only applies if you use --unpair. Otherwise, queues and fail queues are treated as a unit.' }
Expand Down Expand Up @@ -407,24 +422,26 @@ export async function idleQueues (argv, testHook) {
setupAWS(options)
const { idleQueues: idleQueuesOriginal } = await import('./idleQueues.js')
const idleQueues = testHook || idleQueuesOriginal

return idleQueues(queues, options)
.then(function (result) {
debug('idleQueues returned', result)
if (result === 'noQueues') return Promise.resolve()
const callsSQS = result.map(a => a.apiCalls.SQS).reduce((a, b) => a + b, 0)
const callsCloudWatch = result.map(a => a.apiCalls.CloudWatch).reduce((a, b) => a + b, 0)
if (options.verbose) console.error(chalk.blue('Used ') + callsSQS + chalk.blue(' SQS and ') + callsCloudWatch + chalk.blue(' CloudWatch API calls.'))
// Print idle queues to stdout
result.filter(a => a.idle).map(a => a.queue).forEach(q => console.log(q))
return result
})
.catch(err => {
if (err.Code === 'AWS.SimpleQueueService.NonExistentQueue') {
console.error(chalk.yellow('This error can occur when you run this command immediately after deleting a queue. Wait 60 seconds and try again.'))
return Promise.reject(err)
}
})
const opt = getOptionsWithDefaults(options)
try {
const result = (
await withSentry(async () => idleQueues(queues, opt), opt)
)
debug('idleQueues returned', result)
if (result === 'noQueues') return Promise.resolve()
const callsSQS = result.map(a => a.apiCalls.SQS).reduce((a, b) => a + b, 0)
const callsCloudWatch = result.map(a => a.apiCalls.CloudWatch).reduce((a, b) => a + b, 0)
if (options.verbose) console.error(chalk.blue('Used ') + callsSQS + chalk.blue(' SQS and ') + callsCloudWatch + chalk.blue(' CloudWatch API calls.'))

// Print idle queues to stdout
result.filter(a => a.idle).map(a => a.queue).forEach(q => console.log(q))
return result
} catch (err) {
if (err instanceof QueueDoesNotExist) {
console.error(chalk.yellow('This error can occur when you run this command immediately after deleting a queue. Wait 60 seconds and try again.'))
}
throw err
}
}

export async function root (originalArgv, testHook) {
Expand Down
14 changes: 14 additions & 0 deletions src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ import { cheapIdleCheck } from './idleQueues.js'
import { getOptionsWithDefaults } from './defaults.js'
import { getSQSClient } from './sqs.js'

//
// Throwing an instance of this Error allows the processMessages callback to
// refuse a message which then gets immediately returned to the queue.
//
// This has the side effect of throtting the queue since it stops polling on
// the queue until the next queue resolution in processMessages.
//
// This is useful for implementing schedulers on top of qdone, for example, to
// look at the queue name and decide whether to take on a new message.
//
export class DoNotProcess extends Error {}

const debug = Debug('qdone:worker')
Expand Down Expand Up @@ -136,6 +146,10 @@ export async function processMessage (message, callback, qname, qrl, opt) {

// If the callback does not want to process this message, return to queue
if (err instanceof DoNotProcess) {
if (opt.verbose) {
console.error(chalk.blue(' callback ') + chalk.yellow('REFUSED'))
console.error(chalk.blue(' cleaning up (removing message) ...'))
}
const result = await getSQSClient().send(new ChangeMessageVisibilityCommand({
QueueUrl: qrl,
ReceiptHandle: message.ReceiptHandle,
Expand Down
33 changes: 24 additions & 9 deletions src/defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ export const defaults = Object.freeze({
quiet: false,
verbose: false,
cache: false,
cacheUri: undefined,
cachePrefix: 'qdone:',
cacheTtlSeconds: 10,
fifo: false,
disableLog: false,
includeFailed: false,

// Enqueue
groupId: uuidv1(),
Expand All @@ -33,7 +35,11 @@ export const defaults = Object.freeze({
killAfter: 30,
archive: false,
activeOnly: false,
includeFailed: false

// Idle Queues
idleFor: 60,
delete: false,
unpair: false
})

/**
Expand All @@ -58,29 +64,38 @@ export function getOptionsWithDefaults (options) {
region: options.region || process.env.AWS_REGION || defaults.region,
quiet: options.quiet || defaults.quiet,
verbose: options.verbose || defaults.verbose,
cachePrefix: options.cachePrefix || options['cache-prefix'] || defaults.cachePrefix,
cacheTtlSeconds: options.cacheTtlSeconds || options['cache-ttl-seconds'] || defaults.cacheTtlSeconds,
fifo: options.fifo || defaults.fifo,
sentryDsn: options.sentryDsn || options['sentry-dsn'],
disableLog: options['disable-log'] || defaults.disableLog,
disableLog: options.disableLog || options['disable-log'] || defaults.disableLog,

// Cache
cacheUri: options.cacheUri || options['cache-uri'] || defaults.cacheUri,
cachePrefix: options.cachePrefix || options['cache-prefix'] || defaults.cachePrefix,
cacheTtlSeconds: options.cacheTtlSeconds || options['cache-ttl-seconds'] || defaults.cacheTtlSeconds,

// Enqueue
groupId: options.groupId || options['group-id'] || defaults.groupId,
groupIdPerMessage: false,
deduplicationId: options.deduplicationId || options['deduplication-id'] || defaults.deduplicationId,
messageRetentionPeriod: options['message-retention-period'] || defaults.messageRetentionPeriod,
messageRetentionPeriod: options.messageRetentionPeriod || options['message-retention-period'] || defaults.messageRetentionPeriod,
delay: options.delay || defaults.delay,
archive: options.archive || defaults.archive,
dlq: dlq || defaults.dlq,
dlqSuffix: options.dlqSuffix || options['dlq-suffix'] || defaults.dlqSuffix,
dlqAfter: options.dlqAfter || options['dlq-after'] || defaults.dlqAfter,
tags: options.tags || undefined,

// Worker
waitTime: options['wait-time'] || defaults.waitTime,
killAfter: options['kill-after'] || defaults.killAfter,
waitTime: options.waitTime || options['wait-time'] || defaults.waitTime,
killAfter: options.killAfter || options['kill-after'] || defaults.killAfter,
archive: options.archive || defaults.archive,
activeOnly: options['active-only'] || defaults.activeOnly,
includeFailed: options['include-failed'] || defaults.includeFailed
activeOnly: options.activeOnly || options['active-only'] || defaults.activeOnly,
includeFailed: options.includeFailed || options['include-failed'] || defaults.includeFailed,

// Idle Queues
idleFor: options.idleFor || options['idle-for'] || defaults.idleFor,
delete: options.delete || defaults.delete,
unpair: options.delete || defaults.unpair
}
process.env.AWS_REGION = opt.region

Expand Down
3 changes: 3 additions & 0 deletions src/enqueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export async function getOrCreateDLQ (queue, opt) {
Attributes: { MessageRetentionPeriod: opt.messageRetentionPeriod + '' },
QueueName: dqname
}
if (opt.tags) params.tags = opt.tags
if (opt.fifo) params.Attributes.FifoQueue = 'true'
const cmd = new CreateQueueCommand(params)
if (opt.verbose) console.error(chalk.blue('Creating dead letter queue ') + dqname)
Expand Down Expand Up @@ -81,6 +82,7 @@ export async function getOrCreateFailQueue (queue, opt) {
maxReceiveCount: opt.dlqAfter + ''
})
}
if (opt.tags) params.tags = opt.tags
if (opt.fifo) params.Attributes.FifoQueue = 'true'
const cmd = new CreateQueueCommand(params)
if (opt.verbose) console.error(chalk.blue('Creating fail queue ') + fqname)
Expand Down Expand Up @@ -121,6 +123,7 @@ export async function getOrCreateQueue (queue, opt) {
},
QueueName: qname
}
if (opt.tags) params.tags = opt.tags
if (opt.fifo) params.Attributes.FifoQueue = 'true'
const cmd = new CreateQueueCommand(params)
debug({ params })
Expand Down
Loading

0 comments on commit d0a2a33

Please sign in to comment.