Skip to content

Commit

Permalink
fix testing issues and one bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Jun 7, 2024
1 parent 64761aa commit 0ce4eeb
Show file tree
Hide file tree
Showing 2 changed files with 416 additions and 87 deletions.
30 changes: 18 additions & 12 deletions src/enqueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -332,17 +332,18 @@ let requestCount = 0
// Returns number of messages flushed.
//
export async function flushMessages (qrl, opt, sendBuffer) {
debug('flushMessages', qrl)
debug('flushMessages', { qrl, sendBuffer })
// Track our outgoing messages to map with Failed / Successful returns
const messagesById = new Map()
const resultsById = new Map()
const results = []
if (sendBuffer[qrl] && sendBuffer[qrl].length) {
for (const message of sendBuffer[qrl]) {
messagesById.set(message.Id, message)
const Id = message.Id
messagesById.set(Id, message)
// Pre-prepare results
const result = {}
resultsById.set(message.Id, result)
const result = { Id }
resultsById.set(Id, result)
results.push(result)
}
}
Expand All @@ -365,10 +366,9 @@ export async function flushMessages (qrl, opt, sendBuffer) {

// Send batch
const data = await sendMessageBatch(qrl, batch, opt)
debug({ data })

// Fail if there are any individual message failures
if (data.Failed && data.Failed.length) {
if (data?.Failed && data?.Failed.length) {
const err = new Error('One or more message failures: ' + JSON.stringify(data.Failed))
err.Failed = data.Failed
throw err
Expand All @@ -377,12 +377,17 @@ export async function flushMessages (qrl, opt, sendBuffer) {
// If we actually managed to flush any of them
if (batch.length) {
requestCount += 1
for (const { Id, MessageId } of data.Successful) {
const { MessageAttributes } = messagesById.get(Id)
if (MessageAttributes?.QdoneDeduplicaitonId?.StringValue) {
resultsById.get(Id).QdoneDeduplicaitonId = MessageAttributes?.QdoneDeduplicaitonId?.StringValue
if (data?.Successful) {
for (const { Id, MessageId } of data.Successful) {
const result = resultsById.get(Id)
const message = messagesById.get(Id)
result.MessageId = MessageId
result.Id = Id
if (message?.MessageAttributes?.QdoneDeduplicationId?.StringValue) {
result.QdoneDeduplicationId = message?.MessageAttributes?.QdoneDeduplicationId?.StringValue
}
if (opt.verbose) console.error(chalk.blue('Enqueued job ') + MessageId + chalk.blue(' request ' + requestCount))
}
if (opt.verbose) console.error(chalk.blue('Enqueued job ') + MessageId + chalk.blue(' request ' + requestCount))
}
numFlushed += batch.length
}
Expand All @@ -396,11 +401,12 @@ export async function flushMessages (qrl, opt, sendBuffer) {
// Automaticaly flushes if queue has >= 10 messages.
// Returns number of messages flushed.
//
const debugAddMessage = Debug('qdone:enqueue:addMessage')
export async function addMessage (qrl, command, messageIndex, opt, sendBuffer, messageOptions) {
const message = formatMessage(command, messageIndex, opt, messageOptions)
sendBuffer[qrl] = sendBuffer[qrl] || []
sendBuffer[qrl].push(message)
debug({ location: 'addMessage', sendBuffer })
debugAddMessage({ location: 'addMessage', messageIndex, sendBuffer })
if (sendBuffer[qrl].length >= 10) {
return flushMessages(qrl, opt, sendBuffer)
}
Expand Down
Loading

0 comments on commit 0ce4eeb

Please sign in to comment.