diff --git a/src/commands/xread.js b/src/commands/xread.js index 3c1e406f..b028c0f5 100644 --- a/src/commands/xread.js +++ b/src/commands/xread.js @@ -22,13 +22,9 @@ export function xread(option, ...args) { // Turn ["stream1", "stream2", "id1", "id2"] into tuples of // [["stream1", "id1"], ["stream2", "id2"]] - const toPoll = rest.reduce((memo, arg, i) => { - const chunk = Math.floor(i / 2) - const tuple = memo[chunk] || [] - // eslint-disable-next-line no-param-reassign - memo[chunk] = tuple.concat(arg) - return memo - }, []) + const half = Math.ceil(rest.length / 2) + const streamsHalf = rest.slice(0, half) + const toPoll = streamsHalf.map((ele, index) => [ele, rest[half+index]]) const pollStream = (stream, id, count = 1) => { const data = this.data.get(stream) @@ -55,9 +51,12 @@ export function xread(option, ...args) { let timeElapsed = 0 const f = () => setTimeout(() => { - if (opVal > 0 && timeElapsed < opVal) return resolve(null) + if (opVal > 0 && timeElapsed >= opVal) return resolve(null) const events = pollEvents(toPoll, 1) - if (events.length > 0) return resolve(events) + // If any stream has a value return + if (events.find(event => event[1] && event[1].length > 0)) { + return resolve(events) + } timeElapsed += 100 return f() }, 100) diff --git a/test/integration/commands/xread.js b/test/integration/commands/xread.js index ac8e0d8c..a11d77d5 100644 --- a/test/integration/commands/xread.js +++ b/test/integration/commands/xread.js @@ -59,7 +59,7 @@ describe('xread', () => { }, }) return redis - .xread('COUNT', '2', 'STREAMS', 'stream', '1-0', 'other-stream', '1-0') + .xread('COUNT', '2', 'STREAMS', 'stream', 'other-stream', '1-0', '1-0') .then(events => expect(events).toEqual([ [ @@ -94,7 +94,7 @@ describe('xread', () => { }, }) return redis - .xread('COUNT', '2', 'STREAMS', 'stream', '1', 'other-stream', '1') + .xread('COUNT', '2', 'STREAMS', 'stream', 'other-stream', '1', '1') .then(events => expect(events).toEqual([ [ @@ -145,10 +145,64 @@ describe('xread', () => { } ) - it('should block reads with a time out', () => { - return redis.xread('BLOCK', '500', 'STREAMS', 'stream', '$').then(row => { - expect(row).toBe(null) - }) + it('should block reads on a stream with a time out', () => { + const redis = new Redis() + const before = performance.now() + return redis + .xread('BLOCK', '500', 'STREAMS', 'empty-stream', '$') + .then(row => { + const after = performance.now() + expect(after - before >= 500).toBe(true) + expect(row).toBe(null) + }) + }) + + it('should block reads on multiple streams with a time out', () => { + const redis = new Redis() + const before = performance.now() + return redis + .xread( + 'BLOCK', + '500', + 'STREAMS', + 'empty-stream', + 'empty-stream-2', + '$', + '$' + ) + .then(row => { + const after = performance.now() + expect(after - before >= 500).toBe(true) + expect(row).toBe(null) + }) + }) + + it('should block until data is provided and return', () => { + const redis = new Redis() + const before = performance.now() + + setTimeout(() => { + return redis.xadd('empty-stream-2', '*', 'key', 'val') + }, 100) + + return redis + .xread( + 'BLOCK', + '500', + 'STREAMS', + 'empty-stream', + 'empty-stream-2', + '$', + '$' + ) + .then(row => { + const after = performance.now() + expect(after - before >= 100).toBe(true) + expect(row).toEqual([ + ['empty-stream-2', [['1-0', ['key', 'val']]]], + ['empty-stream', []], + ]) + }) }) // @TODO Rewrite test so it runs on a real Redis instance