Skip to content

Commit

Permalink
add consumer and example
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Dec 20, 2023
1 parent d9599bb commit 8db04c8
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 9 deletions.
20 changes: 20 additions & 0 deletions examples/processMessageExample.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env -S node --experimental-json-modules
import { enqueue, processMessages, requestShutdown } from 'qdone'

const randomEnqueue = setInterval(function () {
enqueue(['rtest1', 'rtest2', 'rtest3'][Math.round(Math.random()*2)], JSON.stringify({foo: Math.round(Math.random() * 10)}))
}, 1000)

process.on('SIGINT', () => { clearInterval(randomEnqueue); console.log('SIGINT'); requestShutdown() })
process.on('SIGTERM', () => { clearInterval(randomEnqueue); console.log('SIGTERM'); requestShutdown() })

//await enqueue('test1', JSON.stringify({one: 1}))
//await enqueue('test2', JSON.stringify({two: 2}))
//await enqueue('test3', JSON.stringify({three: 3}))

async function callback (queue, payload) {
console.log({ queue, payload })
//if (payload.three) requestShutdown()
}

await processMessages(['rtest1', 'rtest2', 'rtest3'], callback, { verbose: true, disableLog: true })
1 change: 1 addition & 0 deletions index.mjs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { enqueue, enqueueBatch } from './src/enqueue.js'
export { processMessages, requestShutdown } from './src/consumer.js'
311 changes: 311 additions & 0 deletions src/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
/**
* Consumer implementation.
*/

import {
ChangeMessageVisibilityCommand,
ReceiveMessageCommand,
DeleteMessageCommand
} from '@aws-sdk/client-sqs'
import { exec } from 'node:child_process'
import treeKill from 'tree-kill'
import chalk from 'chalk'
import Debug from 'debug'

import { normalizeQueueName, getQnameUrlPairs } from './qrlCache.js'
import { cheapIdleCheck } from './idleQueues.js'
import { getOptionsWithDefaults } from './defaults.js'
import { getSQSClient } from './sqs.js'

const debug = Debug('qdone:worker')

// Global flag for shutdown request
let shutdownRequested = false
const shutdownCallbacks = []

export function requestShutdown () {
shutdownRequested = true
for (const callback of shutdownCallbacks) {
try { callback() } catch (e) { }
}
}

export async function processMessage (message, callback, qname, qrl, opt) {
debug('processMessage', message, qname, qrl)
const payload = JSON.parse(message.Body)
if (opt.verbose) {
console.error(chalk.blue(' Processing payload:'), payload)
} else if (!opt.disableLog) {
console.log(JSON.stringify({
event: 'MESSAGE_PROCESSING_START',
timestamp: new Date(),
messageId: message.MessageId,
payload: payload
}))
}

const jobStart = new Date()
let visibilityTimeout = 30 // this should be the queue timeout
let timeoutExtender

async function extendTimeout () {
debug('extendTimeout')
const maxJobRun = 12 * 60 * 60
const jobRunTime = ((new Date()) - jobStart) / 1000
// Double every time, up to max
visibilityTimeout = Math.min(visibilityTimeout * 2, maxJobRun - jobRunTime, opt.killAfter - jobRunTime)
if (opt.verbose) {
console.error(
chalk.blue(' Ran for ') + jobRunTime +
chalk.blue(' seconds, requesting another ') + visibilityTimeout +
chalk.blue(' seconds')
)
}

try {
const result = await getSQSClient().send(new ChangeMessageVisibilityCommand({
QueueUrl: qrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: visibilityTimeout
}))
debug('ChangeMessageVisibility.then returned', result)
if (
jobRunTime + visibilityTimeout >= maxJobRun ||
jobRunTime + visibilityTimeout >= opt.killAfter
) {
if (opt.verbose) console.error(chalk.yellow(' warning: this is our last time extension'))
} else {
// Extend when we get 50% of the way to timeout
timeoutExtender = setTimeout(extendTimeout, visibilityTimeout * 1000 * 0.5)
}
} catch (err) {
debug('changeMessageVisibility.catch returned', err)
// Rejection means we're ouuta time, whatever, let the job die
if (opt.verbose) {
console.error(chalk.red(' failed to extend job: ') + err)
} else if (!opt.disableLog) {
// Production error logging
console.log(JSON.stringify({
event: 'MESSAGE_PROCESSING_FAILED',
reason: 'ran longer than --kill-after',
timestamp: new Date(),
messageId: message.MessageId,
payload: payload,
errorMessage: err.toString().split('\n').slice(1).join('\n').trim() || undefined,
err
}))
}
}
}

// Extend when we get 50% of the way to timeout
timeoutExtender = setTimeout(extendTimeout, visibilityTimeout * 1000 * 0.5)
debug('timeout', visibilityTimeout * 1000 * 0.5)

try {
// Process message
const result = await callback(qname, payload)
debug('processMessage callback finished', { payload, result })
clearTimeout(timeoutExtender)
if (opt.verbose) {
console.error(chalk.green(' SUCCESS'))
console.error(chalk.blue(' cleaning up (removing message) ...'))
}
await getSQSClient().send(new DeleteMessageCommand({
QueueUrl: qrl,
ReceiptHandle: message.ReceiptHandle
}))
if (opt.verbose) {
console.error(chalk.blue(' done'))
console.error()
} else if (!opt.disableLog) {
console.log(JSON.stringify({
event: 'MESSAGE_PROCESSING_COMPLETE',
timestamp: new Date(),
messageId: message.MessageId,
payload: payload
}))
}
return { noJobs: 0, jobsSucceeded: 1, jobsFailed: 0 }
} catch (err) {
// Fail path for job execution
debug('exec.catch')
clearTimeout(timeoutExtender)
if (opt.verbose) {
console.error(chalk.red(' FAILED'))
console.error(chalk.blue(' error : ') + err)
} else if (!opt.disableLog) {
// Production error logging
console.log(JSON.stringify({
event: 'MESSAGE_PROCESSING_FAILED',
reason: 'exception thrown',
timestamp: new Date(),
messageId: message.MessageId,
payload: payload,
errorMessage: err.toString().split('\n').slice(1).join('\n').trim() || undefined,
err
}))
}
return { noJobs: 0, jobsSucceeded: 0, jobsFailed: 1 }
}
}

//
// Pull work off of a single queue
//
export async function pollSingleQueue (qname, qrl, callback, opt) {
debug('pollSingleQueue', { qname, qrl, callback, opt })
const params = {
AttributeNames: ['All'],
MaxNumberOfMessages: 1,
MessageAttributeNames: ['All'],
QueueUrl: qrl,
VisibilityTimeout: 30,
WaitTimeSeconds: opt.waitTime
}
const response = await getSQSClient().send(new ReceiveMessageCommand(params))
debug('ReceiveMessage response', response)
if (shutdownRequested) return { noJobs: 0, jobsSucceeded: 0, jobsFailed: 0 }
if (response.Messages) {
const message = response.Messages[0]
if (opt.verbose) console.error(chalk.blue(' Found message ' + message.MessageId))
return processMessage(message, callback, qname, qrl, opt)
} else {
return { noJobs: 1, jobsSucceeded: 0, jobsFailed: 0 }
}
}

//
// Resolve a set of queues
//
export async function resolveQueues (queues, opt) {
// Start processing
if (opt.verbose) console.error(chalk.blue('Resolving queues: ') + queues.join(' '))
const qnames = queues.map(queue => normalizeQueueName(queue, opt))
const pairs = await getQnameUrlPairs(qnames, opt)

// Figure out which pairs are active
const activePairs = []
if (opt.activeOnly) {
debug({ pairsBeforeCheck: pairs })
await Promise.all(pairs.map(async pair => {
const { idle } = await cheapIdleCheck(pair.qname, pair.qrl, opt)
if (!idle) activePairs.push(pair)
}))
}

// Finished resolving
debug('getQnameUrlPairs.then')
if (opt.verbose) {
console.error(chalk.blue(' done'))
console.error()
}

// Figure out which queues we want to listen on, choosing between active and
// all, filtering out failed queues if the user wants that
const selectedPairs = (opt.activeOnly ? activePairs : pairs)
.filter(({ qname }) => {
const suf = opt.failSuffix + (opt.fifo ? '.fifo' : '')
const isFailQueue = qname.slice(-suf.length) === suf
const shouldInclude = opt.includeFailed ? true : !isFailQueue
return shouldInclude
})

return selectedPairs
}

const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms))

//
// Consumer
//
export async function processMessages (queues, callback, options) {
const opt = getOptionsWithDefaults(options)
debug('processMessages', {queues, callback, options, opt })

const stats = { noJobs: 0, jobsSucceeded: 0, jobsFailed: 0 }
const activeLoops = {}

function shutdownCallback () {
if (opt.verbose) {
debug({ activeLoops })
const activeQueues = Object.keys(activeLoops).filter(q => activeLoops[q]).map(q => q.slice(opt.prefix.length))
if (activeQueues.length) {
console.error(chalk.blue('Waiting for work to finish on the following queues: ') + activeQueues.join(chalk.blue(', ')))
}
}
}
shutdownCallbacks.push(shutdownCallback)

// Listen to a queue until it is out of messages
async function listenLoop (qname, qrl) {
try {
if (shutdownRequested) return
if (opt.verbose) {
console.error(
chalk.blue('Looking for work on ') +
qname.slice(opt.prefix.length) +
chalk.blue(' (' + qrl + ')')
)
}
// Aggregate the results
const { noJobs, jobsSucceeded, jobsFailed } = await pollSingleQueue(qname, qrl, callback, opt)
stats.noJobs += noJobs
stats.jobsFailed += jobsFailed
stats.jobsSucceeded += jobsSucceeded

// No work? return to outer loop
if (noJobs) return

// Otherwise keep going
return listenLoop(qname, qrl)

} catch (err) {
// TODO: Sentry
console.error(chalk.red(' ERROR in listenLoop'))
console.error(chalk.blue(' error : ') + err)

} finally {
delete activeLoops[qname]
}
}

// Resolve loop
while (!shutdownRequested) {
const start = new Date()
const selectedPairs = await resolveQueues(queues, opt)
if (shutdownRequested) break

// But only if we have queues to listen on
if (selectedPairs.length) {
if (opt.verbose) {
console.error(chalk.blue('Listening to queues (in this order):'))
console.error(selectedPairs.map(({ qname, qrl }) =>
' ' + qname.slice(opt.prefix.length) + chalk.blue(' - ' + qrl)
).join('\n'))
console.error()
}

// Launch listen loop for each queue
for (const { qname, qrl } of selectedPairs) {
if (!activeLoops[qname]) activeLoops[qname] = listenLoop(qname, qrl)
}
}
// Wait until the next time we need to resolve
if (!shutdownRequested) {
const msSoFar = Math.max(0, new Date() - start)
const msUntilNextResolve = Math.max(0, opt.waitTime * 1000 - msSoFar)
debug({ msSoFar, msUntilNextResolve })
if (msUntilNextResolve) {
if (opt.verbose) console.error(chalk.blue('Will resolve queues again in ' + Math.round(msUntilNextResolve / 1000) + ' seconds'))
await delay(msUntilNextResolve)
}
}
}

// Wait on all work to finish
// shutdownCallback()
await Promise.all(Object.values(activeLoops))
}

debug('loaded')
Loading

0 comments on commit 8db04c8

Please sign in to comment.