diff --git a/src/consumer.js b/src/consumer.js index 0725360..9e162c6 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -84,19 +84,25 @@ export async function processMessages (queues, callback, options) { // Keep track of how many messages could be returned from each queue const activeQrls = new Map() + const listeningQrls = new Set() let maxReturnCount = 0 const listen = async (qname, qrl, maxMessages) => { if (opt.verbose) { console.error(chalk.blue('Listening on: '), qname) } - activeQrls.set(qrl, (activeQrls.get(qrl) || 0) + 1) maxReturnCount += maxMessages try { + listeningQrls.add(qrl) const messages = await getMessages(qrl, opt, maxMessages) + listeningQrls.delete(qrl) if (!shutdownRequested) { if (messages.length) { + activeQrls.set(qrl, (activeQrls.get(qrl) || 0) + 1) await jobExecutor.executeJobs(messages, callback, qname, qrl) + const count = activeQrls.get(qrl) - 1 + if (count) activeQrls.set(qrl, count) + else activeQrls.delete(qrl) queueManager.updateIcehouse(qrl, false) } else { // If we didn't get any, update the icehouse so we can back off @@ -106,9 +112,6 @@ export async function processMessages (queues, callback, options) { // Max job accounting maxReturnCount -= maxMessages - const count = activeQrls.get(qrl) - 1 - if (count) activeQrls.set(qrl, count) - else activeQrls.delete(qrl) } catch (e) { // If the queue has been cleaned up, we should back off anyway if (e instanceof QueueDoesNotExist) { @@ -119,6 +122,14 @@ export async function processMessages (queues, callback, options) { } } + if (opt.verbose) { + function printUrls () { + console.error({ activeQrls, listeningQrls }) + if (!shutdownRequested) setTimeout(printUrls, 2000) + } + printUrls() + } + while (!shutdownRequested) { // eslint-disable-line // Figure out how we are running const runningJobs = jobExecutor.runningJobCount() @@ -147,19 +158,19 @@ export async function processMessages (queues, callback, options) { let jobsLeft = targetJobs if (opt.verbose) { - console.error({ maxConcurrentJobs: opt.maxConcurrentJobs, maxReturnCount, runningJobs, allowedJobs, maxLatency, latencyFactor, freememFactor, loadFactor, overallFactor, targetJobs, activeQrls }) + console.error({ maxConcurrentJobs: opt.maxConcurrentJobs, maxReturnCount, runningJobs, allowedJobs, maxLatency, latencyFactor, freememFactor, loadFactor, overallFactor, targetJobs }) } for (const { qname, qrl } of queueManager.getPairs()) { // const qcount = jobExecutor.runningJobCountForQueue(qname) // console.log({ evaluating: { qname, qrl, qcount, jobsLeft, activeQrlsHasQrl: activeQrls.has(qrl) } }) if (jobsLeft <= 0) break - // if (activeQrls.has(qrl)) continue + if (listeningQrls.has(qrl)) continue const maxMessages = Math.min(10, jobsLeft) listen(qname, qrl, maxMessages) jobsLeft -= maxMessages // debug({ listenedTo: { qname, maxMessages, jobsLeft } }) } - await delay(500) + await delay(300) } debug('after all') } diff --git a/src/scheduler/systemMonitor.js b/src/scheduler/systemMonitor.js index f0904e7..2ac8f0a 100644 --- a/src/scheduler/systemMonitor.js +++ b/src/scheduler/systemMonitor.js @@ -71,7 +71,7 @@ export class SystemMonitor { */ measureLoad () { - const [newLoad, ] = os.loadavg() + const [newLoad] = os.loadavg() const previousLoad = this.oneMinuteLoad if (previousLoad !== newLoad) { const e = 1884 / 2048 // see include/linux/sched/loadavg.h @@ -84,7 +84,7 @@ export class SystemMonitor { } } - getLoad() { + getLoad () { return this.instantaneousLoad } diff --git a/test/enqueue.test.js b/test/enqueue.test.js index 7799017..6981aa4 100644 --- a/test/enqueue.test.js +++ b/test/enqueue.test.js @@ -785,7 +785,7 @@ describe('addMessage / flushMessages', () => { { MD5OfMessageBody: md5, MessageId: messageId, Id: '6' }, { MD5OfMessageBody: md5, MessageId: messageId, Id: '7' }, { MD5OfMessageBody: md5, MessageId: messageId, Id: '8' }, - { MD5OfMessageBody: md5, MessageId: messageId, Id: '9' }, + { MD5OfMessageBody: md5, MessageId: messageId, Id: '9' } ] }) .resolvesOnce({ @@ -894,7 +894,7 @@ describe('addMessage / flushMessages', () => { .on(SendMessageBatchCommand, { QueueUrl: qrl }) .resolvesOnce({ Successful: [ - { MD5OfMessageBody: md5, MessageId: messageId, Id: '0' }, + { MD5OfMessageBody: md5, MessageId: messageId, Id: '0' } ] }) @@ -918,7 +918,6 @@ describe('addMessage / flushMessages', () => { Entries: [message] }) ) - }) test('failed messages fail the whole batch', async () => {