Skip to content

Commit

Permalink
fix: xread fixes (#1318)
Browse files Browse the repository at this point in the history
* fix: stream and id order in xread command

The syntax for an XREAD command is

```
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id
  [id ...]
```

Before this fix the implementation used

```
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key id [key id...]
```

The new approach should be equally fast and provide the correct results. Also
adjusted existing tests.

Links:
https://redis.io/commands/xread/

* fix: BLOCKING behaviour to not return immediately

It needs to wait until the timeElapsed is greater or eq the defined blocking time.
The test was enhanced to spot the issue and verify the fix worked. This
uncovered another issue with the way found events are evaluated to always return
even without any event received. This problem is also fixed by this commit.

---------

Co-authored-by: Sven Sterbling <[email protected]>
  • Loading branch information
ullumullu and ssterb authored Sep 29, 2023
1 parent d0cbea2 commit 3978302
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 15 deletions.
17 changes: 8 additions & 9 deletions src/commands/xread.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@ export function xread(option, ...args) {

// Turn ["stream1", "stream2", "id1", "id2"] into tuples of
// [["stream1", "id1"], ["stream2", "id2"]]
const toPoll = rest.reduce((memo, arg, i) => {
const chunk = Math.floor(i / 2)
const tuple = memo[chunk] || []
// eslint-disable-next-line no-param-reassign
memo[chunk] = tuple.concat(arg)
return memo
}, [])
const half = Math.ceil(rest.length / 2)
const streamsHalf = rest.slice(0, half)
const toPoll = streamsHalf.map((ele, index) => [ele, rest[half+index]])

const pollStream = (stream, id, count = 1) => {
const data = this.data.get(stream)
Expand All @@ -55,9 +51,12 @@ export function xread(option, ...args) {
let timeElapsed = 0
const f = () =>
setTimeout(() => {
if (opVal > 0 && timeElapsed < opVal) return resolve(null)
if (opVal > 0 && timeElapsed >= opVal) return resolve(null)
const events = pollEvents(toPoll, 1)
if (events.length > 0) return resolve(events)
// If any stream has a value return
if (events.find(event => event[1] && event[1].length > 0)) {
return resolve(events)
}
timeElapsed += 100
return f()
}, 100)
Expand Down
66 changes: 60 additions & 6 deletions test/integration/commands/xread.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('xread', () => {
},
})
return redis
.xread('COUNT', '2', 'STREAMS', 'stream', '1-0', 'other-stream', '1-0')
.xread('COUNT', '2', 'STREAMS', 'stream', 'other-stream', '1-0', '1-0')
.then(events =>
expect(events).toEqual([
[
Expand Down Expand Up @@ -94,7 +94,7 @@ describe('xread', () => {
},
})
return redis
.xread('COUNT', '2', 'STREAMS', 'stream', '1', 'other-stream', '1')
.xread('COUNT', '2', 'STREAMS', 'stream', 'other-stream', '1', '1')
.then(events =>
expect(events).toEqual([
[
Expand Down Expand Up @@ -145,10 +145,64 @@ describe('xread', () => {
}
)

it('should block reads with a time out', () => {
return redis.xread('BLOCK', '500', 'STREAMS', 'stream', '$').then(row => {
expect(row).toBe(null)
})
it('should block reads on a stream with a time out', () => {
const redis = new Redis()
const before = performance.now()
return redis
.xread('BLOCK', '500', 'STREAMS', 'empty-stream', '$')
.then(row => {
const after = performance.now()
expect(after - before >= 500).toBe(true)
expect(row).toBe(null)
})
})

it('should block reads on multiple streams with a time out', () => {
const redis = new Redis()
const before = performance.now()
return redis
.xread(
'BLOCK',
'500',
'STREAMS',
'empty-stream',
'empty-stream-2',
'$',
'$'
)
.then(row => {
const after = performance.now()
expect(after - before >= 500).toBe(true)
expect(row).toBe(null)
})
})

it('should block until data is provided and return', () => {
const redis = new Redis()
const before = performance.now()

setTimeout(() => {
return redis.xadd('empty-stream-2', '*', 'key', 'val')
}, 100)

return redis
.xread(
'BLOCK',
'500',
'STREAMS',
'empty-stream',
'empty-stream-2',
'$',
'$'
)
.then(row => {
const after = performance.now()
expect(after - before >= 100).toBe(true)
expect(row).toEqual([
['empty-stream-2', [['1-0', ['key', 'val']]]],
['empty-stream', []],
])
})
})

// @TODO Rewrite test so it runs on a real Redis instance
Expand Down

0 comments on commit 3978302

Please sign in to comment.