Skip to content
This repository was archived by the owner on May 20, 2024. It is now read-only.

Commit 3ba43a3

Browse files
committed
bug: correctly implement multiple reads
1 parent afb813e commit 3ba43a3

File tree

4 files changed

+68
-46
lines changed

4 files changed

+68
-46
lines changed

index.js

+41-21
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,61 @@
1+
const debug = require('debug')('aws-api-read-stream')
12
const { Readable, finished } = require('stream')
23

4+
const STOPPED = 'stopped'
5+
const READING = 'reading'
6+
const DONE = 'done'
7+
38
class AWSApiReadStream extends Readable {
49
constructor(fn, opts, nextToken) {
510
super({ ...opts, objectMode: true })
611

712
this._fn = fn
813
this._nextToken = nextToken
14+
this._state = STOPPED
915
}
1016

11-
async _read(size) {
12-
let shouldContinue = false
13-
do {
14-
try {
15-
const res = await this._fn(this._nextToken)
17+
_read(size) {
18+
if (this._state === READING) return
19+
debug('_read', size, 'nextToken', this._nextToken)
20+
this._execApiCall()
21+
}
1622

17-
if (!res) {
18-
this.push(null)
19-
return
20-
}
23+
async _execApiCall() {
24+
this._state = READING
25+
try {
26+
const res = await this._fn(this._nextToken)
27+
28+
if (!res) {
29+
this._apiExecutionDone()
30+
return
31+
}
2132

22-
if (this._isInBufferMode()) {
23-
this._buffer.push(res)
24-
}
33+
if (this._isInBufferMode()) {
34+
this._buffer.push(res)
35+
}
2536

26-
shouldContinue = this.push(res)
27-
this._nextToken = res.NextToken || res.NextContinuationToken
37+
this._nextToken = res.NextToken || res.NextContinuationToken
2838

29-
if (!this._nextToken) {
30-
this.push(null)
31-
return
32-
}
39+
if (!this._nextToken) {
40+
this._apiExecutionDone()
41+
return
42+
}
3343

34-
} catch (e) {
35-
this.destroy(e)
44+
if (!this.push(res)) {
45+
this._state = STOPPED
3646
return
3747
}
38-
} while (shouldContinue)
48+
49+
this._execApiCall()
50+
} catch (e) {
51+
this.destroy(e)
52+
}
53+
}
54+
55+
_apiExecutionDone() {
56+
this._state = DONE
57+
this._nextToken = undefined
58+
this.push(null)
3959
}
4060

4161
stop() {

package-lock.json

+1-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"description": "turns an aws api call into a readable stream",
55
"main": "index.js",
66
"scripts": {
7-
"test": "npx ava"
7+
"test": "env DEBUG=aws-api-read-stream npx ava"
88
},
99
"repository": {
1010
"type": "git",
@@ -25,5 +25,8 @@
2525
"devDependencies": {
2626
"ava": "^3.5.2",
2727
"aws-sdk": "^2.649.0"
28+
},
29+
"dependencies": {
30+
"debug": "^4.1.1"
2831
}
2932
}

test.js

+22-21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const test = require('ava')
22
const AWSApiReadStream = require('./index')
3+
const tokens = 'abcdefghij'.split('')
34

45
test('AWSApiReadStream', async t => {
56
const testapi = new TestAPI()
@@ -9,35 +10,35 @@ test('AWSApiReadStream', async t => {
910

1011
for await (const { data, NextToken } of stream) {
1112
t.is(data, count)
12-
13-
if (count < 10) {
14-
t.is(NextToken, count + 1)
15-
16-
t.is(testapi.nextTokenSpy, count + 1)
17-
18-
} else {
19-
t.is(NextToken, undefined)
20-
}
21-
2213
count++
2314
}
15+
16+
t.deepEqual(testapi.spy, [
17+
{ nextToken: undefined, data: 0 },
18+
{ nextToken: 'a', data: 1 },
19+
{ nextToken: 'b', data: 2 },
20+
{ nextToken: 'c', data: 3 },
21+
{ nextToken: 'd', data: 4 },
22+
{ nextToken: 'e', data: 5 },
23+
{ nextToken: 'f', data: 6 },
24+
{ nextToken: 'g', data: 7 },
25+
{ nextToken: 'h', data: 8 },
26+
{ nextToken: 'i', data: 9 },
27+
{ nextToken: 'j', data: 10 }
28+
])
2429
})
2530

2631
test('AWSApiReadStream - initialize with existing token', async t => {
2732
const testapi = new TestAPI()
28-
29-
let count = 0
30-
31-
const stream = AWSApiReadStream.from(nextToken => testapi.anAPICall(nextToken), { nextToken: 5 })
33+
const stream = AWSApiReadStream.from(nextToken => testapi.anAPICall(nextToken), { nextToken: 'f' })
3234

3335
// can't think of a better way to test this other than check internals...
34-
t.is(stream._nextToken, 5)
36+
t.is(stream._nextToken, 'f')
3537
})
3638

3739
test('AWSApiReadStream - returning null or undefined will stop execution', async t => {
3840
const testapi = new TestAPI()
39-
const stream = AWSApiReadStream.from(nextToken => nextToken === 2 ? null : testapi.anAPICall(nextToken))
40-
41+
const stream = AWSApiReadStream.from(nextToken => nextToken === 'b' ? null : testapi.anAPICall(nextToken))
4142

4243
const results = []
4344
for await (const { data, NextToken } of stream) {
@@ -50,19 +51,19 @@ test.skip('AWSApiReadStream - backpressure', async t => {
5051

5152
})
5253

53-
5454
class TestAPI {
5555
constructor() {
5656
this._counter = 0
57+
this.spy = []
5758
}
5859

5960
anAPICall(nextToken) {
60-
this.nextTokenSpy = nextToken
61+
this.spy.push({ nextToken, data: this._counter })
6162
return new Promise(res => {
6263
setImmediate(() => {
6364
res({
64-
data: this._counter,
65-
NextToken: this._counter < 10 ? ++this._counter : undefined
65+
NextToken: tokens[this._counter],
66+
data: this._counter++
6667
})
6768
})
6869
})

0 commit comments

Comments
 (0)