Skip to content

Commit

Permalink
a little cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Sep 10, 2024
1 parent cbef94d commit 0c1a6c2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
25 changes: 18 additions & 7 deletions src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,25 @@ export async function processMessages (queues, callback, options) {

// Keep track of how many messages could be returned from each queue
const activeQrls = new Map()
const listeningQrls = new Set()
let maxReturnCount = 0
const listen = async (qname, qrl, maxMessages) => {
if (opt.verbose) {
console.error(chalk.blue('Listening on: '), qname)
}
activeQrls.set(qrl, (activeQrls.get(qrl) || 0) + 1)
maxReturnCount += maxMessages
try {
listeningQrls.add(qrl)
const messages = await getMessages(qrl, opt, maxMessages)
listeningQrls.delete(qrl)

if (!shutdownRequested) {
if (messages.length) {
activeQrls.set(qrl, (activeQrls.get(qrl) || 0) + 1)
await jobExecutor.executeJobs(messages, callback, qname, qrl)
const count = activeQrls.get(qrl) - 1
if (count) activeQrls.set(qrl, count)
else activeQrls.delete(qrl)
queueManager.updateIcehouse(qrl, false)
} else {
// If we didn't get any, update the icehouse so we can back off
Expand All @@ -106,9 +112,6 @@ export async function processMessages (queues, callback, options) {

// Max job accounting
maxReturnCount -= maxMessages
const count = activeQrls.get(qrl) - 1
if (count) activeQrls.set(qrl, count)
else activeQrls.delete(qrl)
} catch (e) {
// If the queue has been cleaned up, we should back off anyway
if (e instanceof QueueDoesNotExist) {
Expand All @@ -119,6 +122,14 @@ export async function processMessages (queues, callback, options) {
}
}

if (opt.verbose) {
function printUrls () {
console.error({ activeQrls, listeningQrls })
if (!shutdownRequested) setTimeout(printUrls, 2000)
}
printUrls()
}

while (!shutdownRequested) { // eslint-disable-line
// Figure out how we are running
const runningJobs = jobExecutor.runningJobCount()
Expand Down Expand Up @@ -147,19 +158,19 @@ export async function processMessages (queues, callback, options) {
let jobsLeft = targetJobs

if (opt.verbose) {
console.error({ maxConcurrentJobs: opt.maxConcurrentJobs, maxReturnCount, runningJobs, allowedJobs, maxLatency, latencyFactor, freememFactor, loadFactor, overallFactor, targetJobs, activeQrls })
console.error({ maxConcurrentJobs: opt.maxConcurrentJobs, maxReturnCount, runningJobs, allowedJobs, maxLatency, latencyFactor, freememFactor, loadFactor, overallFactor, targetJobs })
}
for (const { qname, qrl } of queueManager.getPairs()) {
// const qcount = jobExecutor.runningJobCountForQueue(qname)
// console.log({ evaluating: { qname, qrl, qcount, jobsLeft, activeQrlsHasQrl: activeQrls.has(qrl) } })
if (jobsLeft <= 0) break
// if (activeQrls.has(qrl)) continue
if (listeningQrls.has(qrl)) continue
const maxMessages = Math.min(10, jobsLeft)
listen(qname, qrl, maxMessages)
jobsLeft -= maxMessages
// debug({ listenedTo: { qname, maxMessages, jobsLeft } })
}
await delay(500)
await delay(300)
}
debug('after all')
}
4 changes: 2 additions & 2 deletions src/scheduler/systemMonitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class SystemMonitor {
*/

measureLoad () {
const [newLoad, ] = os.loadavg()
const [newLoad] = os.loadavg()
const previousLoad = this.oneMinuteLoad
if (previousLoad !== newLoad) {
const e = 1884 / 2048 // see include/linux/sched/loadavg.h
Expand All @@ -84,7 +84,7 @@ export class SystemMonitor {
}
}

getLoad() {
getLoad () {
return this.instantaneousLoad
}

Expand Down
5 changes: 2 additions & 3 deletions test/enqueue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ describe('addMessage / flushMessages', () => {
{ MD5OfMessageBody: md5, MessageId: messageId, Id: '6' },
{ MD5OfMessageBody: md5, MessageId: messageId, Id: '7' },
{ MD5OfMessageBody: md5, MessageId: messageId, Id: '8' },
{ MD5OfMessageBody: md5, MessageId: messageId, Id: '9' },
{ MD5OfMessageBody: md5, MessageId: messageId, Id: '9' }
]
})
.resolvesOnce({
Expand Down Expand Up @@ -894,7 +894,7 @@ describe('addMessage / flushMessages', () => {
.on(SendMessageBatchCommand, { QueueUrl: qrl })
.resolvesOnce({
Successful: [
{ MD5OfMessageBody: md5, MessageId: messageId, Id: '0' },
{ MD5OfMessageBody: md5, MessageId: messageId, Id: '0' }
]
})

Expand All @@ -918,7 +918,6 @@ describe('addMessage / flushMessages', () => {
Entries: [message]
})
)

})

test('failed messages fail the whole batch', async () => {
Expand Down

0 comments on commit 0c1a6c2

Please sign in to comment.