Skip to content

Commit

Permalink
Merge pull request #64 from suredone/return-deduplication-ids
Browse files Browse the repository at this point in the history
Return deduplicationId to callers of enqueue() and enqueueBatch()
  • Loading branch information
jasonspalace authored Jun 18, 2024
2 parents 0bd632f + fb5739a commit efb72b9
Show file tree
Hide file tree
Showing 4 changed files with 555 additions and 109 deletions.
4 changes: 2 additions & 2 deletions npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "qdone",
"version": "2.0.46-alpha",
"version": "2.0.48-alpha",
"description": "A distributed scheduler for SQS",
"type": "module",
"main": "./index.js",
Expand Down
70 changes: 56 additions & 14 deletions src/enqueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,18 @@ export async function sendMessageBatch (qrl, messages, opt) {
const promises = params.Entries.map(async m => ({ m, shouldEnqueue: await dedupShouldEnqueue(m, opt) }))
const results = await Promise.all(promises)
params.Entries = results.filter(({ shouldEnqueue }) => shouldEnqueue).map(({ m }) => m)

Check warning on line 278 in src/enqueue.js

View check run for this annotation

Codecov / codecov/patch

src/enqueue.js#L276-L278

Added lines #L276 - L278 were not covered by tests
if (!params.Entries.length) return { Failed: [], Successful: [] }
if (!params.Entries.length) {
const result = {

Check warning on line 280 in src/enqueue.js

View check run for this annotation

Codecov / codecov/patch

src/enqueue.js#L280

Added line #L280 was not covered by tests
Failed: [],
Successful: results.map(
({ m: { Id: id, MessageAttributes: ma } }) => ({

Check warning on line 283 in src/enqueue.js

View check run for this annotation

Codecov / codecov/patch

src/enqueue.js#L283

Added line #L283 was not covered by tests
Id: id,
MessageId: 'duplicate',
QdoneDeduplicationId: ma?.QdoneDeduplicationId?.StringValue
}))
}
return result

Check warning on line 289 in src/enqueue.js

View check run for this annotation

Codecov / codecov/patch

src/enqueue.js#L289

Added line #L289 was not covered by tests
}
}

// Send them
Expand Down Expand Up @@ -332,11 +343,27 @@ let requestCount = 0
// Returns number of messages flushed.
//
export async function flushMessages (qrl, opt, sendBuffer) {
debug('flushMessages', qrl)
debug('flushMessages', { qrl, sendBuffer })
// Track our outgoing messages to map with Failed / Successful returns
const messagesById = new Map()
const resultsById = new Map()
const results = []
if (sendBuffer[qrl] && sendBuffer[qrl].length) {
for (const message of sendBuffer[qrl]) {
const Id = message.Id
messagesById.set(Id, message)
// Pre-prepare results
const result = { Id }
resultsById.set(Id, result)
results.push(result)
}
}
// Flush until empty
let numFlushed = 0
async function whileNotEmpty () {
if (!(sendBuffer[qrl] && sendBuffer[qrl].length)) return numFlushed
if (!(sendBuffer[qrl] && sendBuffer[qrl].length)) {
return { numFlushed, results }
}
// Construct batch until full
const batch = []
let nextSize = JSON.stringify(sendBuffer[qrl][0]).length
Expand All @@ -350,10 +377,9 @@ export async function flushMessages (qrl, opt, sendBuffer) {

// Send batch
const data = await sendMessageBatch(qrl, batch, opt)
debug({ data })

// Fail if there are any individual message failures
if (data.Failed && data.Failed.length) {
if (data?.Failed && data?.Failed.length) {
const err = new Error('One or more message failures: ' + JSON.stringify(data.Failed))
err.Failed = data.Failed
throw err

Check warning on line 385 in src/enqueue.js

View check run for this annotation

Codecov / codecov/patch

src/enqueue.js#L383-L385

Added lines #L383 - L385 were not covered by tests
Expand All @@ -362,9 +388,18 @@ export async function flushMessages (qrl, opt, sendBuffer) {
// If we actually managed to flush any of them
if (batch.length) {
requestCount += 1
data.Successful.forEach(message => {
if (opt.verbose) console.error(chalk.blue('Enqueued job ') + message.MessageId + chalk.blue(' request ' + requestCount))
})
if (data?.Successful) {
for (const { Id, MessageId } of data.Successful) {
const result = resultsById.get(Id)
const message = messagesById.get(Id)
result.MessageId = MessageId
result.Id = Id
if (message?.MessageAttributes?.QdoneDeduplicationId?.StringValue) {
result.QdoneDeduplicationId = message?.MessageAttributes?.QdoneDeduplicationId?.StringValue
}
if (opt.verbose) console.error(chalk.blue('Enqueued job ') + MessageId + chalk.blue(' request ' + requestCount))
}
}
numFlushed += batch.length
}
return whileNotEmpty()
Expand All @@ -377,15 +412,16 @@ export async function flushMessages (qrl, opt, sendBuffer) {
// Automaticaly flushes if queue has >= 10 messages.
// Returns number of messages flushed.
//
const debugAddMessage = Debug('qdone:enqueue:addMessage')
export async function addMessage (qrl, command, messageIndex, opt, sendBuffer, messageOptions) {
const message = formatMessage(command, messageIndex, opt, messageOptions)
sendBuffer[qrl] = sendBuffer[qrl] || []
sendBuffer[qrl].push(message)
debug({ location: 'addMessage', sendBuffer })
debugAddMessage({ location: 'addMessage', messageIndex, sendBuffer })
if (sendBuffer[qrl].length >= 10) {
return flushMessages(qrl, opt, sendBuffer)
}
return 0
return { numFlushed: 0, results: [] }
}

//
Expand Down Expand Up @@ -418,6 +454,7 @@ export async function enqueueBatch (pairs, options) {
setExtra({ qdoneOperation: 'enqueueBatch', args: { pairs, opt } })

Check warning on line 454 in src/enqueue.js

View check run for this annotation

Codecov / codecov/patch

src/enqueue.js#L454

Added line #L454 was not covered by tests
}
try {
const allResults = []
// Find unique queues so we can pre-fetch qrls. We do this so that all
// queues are created prior to going through our flush logic
const normalizedPairs = pairs.map(({ queue, command, messageOptions }) => ({
Expand All @@ -441,19 +478,24 @@ export async function enqueueBatch (pairs, options) {
let initialFlushTotal = 0
for (const { qname, command, messageOptions } of normalizedPairs) {
const qrl = await getOrCreateQueue(qname, opt)
initialFlushTotal += await addMessage(qrl, command, messageIndex++, opt, sendBuffer, messageOptions)
const { numFlushed, results } = await addMessage(qrl, command, messageIndex++, opt, sendBuffer, messageOptions)
initialFlushTotal += numFlushed
allResults.push(...results)
}

// And flush any remaining messages
const extraFlushPromises = []
for (const qrl in sendBuffer) {
extraFlushPromises.push(flushMessages(qrl, opt, sendBuffer))
}
const extraFlushCounts = await Promise.all(extraFlushPromises)
const extraFlushTotal = extraFlushCounts.reduce((a, b) => a + b, 0)
let extraFlushTotal = 0
for (const { numFlushed, results } of await Promise.all(extraFlushPromises)) {
allResults.push(...results)
extraFlushTotal += numFlushed
}
const totalFlushed = initialFlushTotal + extraFlushTotal
debug({ initialFlushTotal, extraFlushTotal, totalFlushed })
return totalFlushed
return { numFlushed: totalFlushed, results: allResults }
} catch (e) {
console.log(e)
throw e

Check warning on line 501 in src/enqueue.js

View check run for this annotation

Codecov / codecov/patch

src/enqueue.js#L500-L501

Added lines #L500 - L501 were not covered by tests
Expand Down
Loading

0 comments on commit efb72b9

Please sign in to comment.