From e009e5301256ad5c988493e3ca73532842d29ba6 Mon Sep 17 00:00:00 2001 From: jakecastelli Date: Tue, 17 Dec 2024 15:12:08 +1030 Subject: [PATCH 1/2] Revert "stream: handle generator destruction from Duplex.from()" This reverts commit 55413004c8ab489a03793b80c20d2ec6552668c0. --- lib/internal/streams/duplexify.js | 59 +------ test/parallel/test-stream-duplex-from.js | 191 ----------------------- 2 files changed, 7 insertions(+), 243 deletions(-) diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 76347cf5579448..3e026352f20432 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -83,19 +83,15 @@ module.exports = function duplexify(body, name) { } if (typeof body === 'function') { - let d; - - const { value, write, final, destroy } = fromAsyncGen(body, () => { - destroyer(d); - }); + const { value, write, final, destroy } = fromAsyncGen(body); // Body might be a constructor function instead of an async generator function. if (isDuplexNodeStream(value)) { - return d = value; + return value; } if (isIterable(value)) { - return d = from(Duplexify, value, { + return from(Duplexify, value, { // TODO (ronag): highWaterMark? objectMode: true, write, @@ -106,16 +102,12 @@ module.exports = function duplexify(body, name) { const then = value?.then; if (typeof then === 'function') { - let finalized = false; + let d; const promise = FunctionPrototypeCall( then, value, (val) => { - // The function returned without (fully) consuming the generator. - if (!finalized) { - destroyer(d); - } if (val != null) { throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); } @@ -131,7 +123,6 @@ module.exports = function duplexify(body, name) { readable: false, write, final(cb) { - finalized = true; final(async () => { try { await promise; @@ -217,12 +208,11 @@ module.exports = function duplexify(body, name) { body); }; -function fromAsyncGen(fn, destructor) { +function fromAsyncGen(fn) { let { promise, resolve } = PromiseWithResolvers(); const ac = new AbortController(); const signal = ac.signal; - - const asyncGenerator = (async function* () { + const value = fn(async function*() { while (true) { const _promise = promise; promise = null; @@ -232,44 +222,9 @@ function fromAsyncGen(fn, destructor) { if (signal.aborted) throw new AbortError(undefined, { cause: signal.reason }); ({ promise, resolve } = PromiseWithResolvers()); - // Next line will "break" the loop if the generator is returned/thrown. yield chunk; } - })(); - - const originalReturn = asyncGenerator.return; - asyncGenerator.return = async function(value) { - try { - return await originalReturn.call(this, value); - } finally { - if (promise) { - const _promise = promise; - promise = null; - const { cb } = await _promise; - process.nextTick(cb); - - process.nextTick(destructor); - } - } - }; - - const originalThrow = asyncGenerator.throw; - asyncGenerator.throw = async function(err) { - try { - return await originalThrow.call(this, err); - } finally { - if (promise) { - const _promise = promise; - promise = null; - const { cb } = await _promise; - - // asyncGenerator.throw(undefined) should cause a callback error - process.nextTick(cb, err ?? new AbortError()); - } - } - }; - - const value = fn(asyncGenerator, { signal }); + }(), { signal }); return { value, diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index dc54ef49c8fba3..e3c117ff8dedb0 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -5,7 +5,6 @@ const assert = require('assert'); const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream'); const { ReadableStream, WritableStream } = require('stream/web'); const { Blob } = require('buffer'); -const sleep = require('util').promisify(setTimeout); { const d = Duplex.from({ @@ -402,193 +401,3 @@ function makeATestWritableStream(writeFunc) { assert.strictEqual(d.writable, false); })); } - -{ - const r = Readable.from(['foo', 'bar', 'baz']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) { - const values = await Array.fromAsync(asyncGenerator); - assert.deepStrictEqual(values, ['foo', 'bar', 'baz']); - - await asyncGenerator.return(); - await asyncGenerator.return(); - await asyncGenerator.return(); - }), - common.mustSucceed(() => { - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo', 'bar', 'baz']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) { - // eslint-disable-next-line no-unused-vars - for await (const _ of asyncGenerator) break; - }), - common.mustSucceed(() => { - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo', 'bar', 'baz']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) { - const a = await asyncGenerator.next(); - assert.strictEqual(a.done, false); - assert.strictEqual(a.value.toString(), 'foo'); - const b = await asyncGenerator.return(); - assert.strictEqual(b.done, true); - }), - common.mustSucceed(() => { - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo', 'bar', 'baz']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) { - // Note: the generator is not even started at this point - await asyncGenerator.return(); - }), - common.mustSucceed(() => { - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo', 'bar', 'baz']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) { - // Same as before, with a delay - await sleep(100); - await asyncGenerator.return(); - }), - common.mustSucceed(() => { - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo', 'bar', 'baz']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) {}), - common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo', 'bar', 'baz']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) { - await sleep(100); - }), - common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo', 'bar', 'baz']); - const d = Duplex.from(async function(asyncGenerator) { - while (!(await asyncGenerator.next()).done) await sleep(100); - }); - - setTimeout(() => d.destroy(), 150); - - pipeline( - r, - d, - common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Duplex.from(async function* () { - for (const value of ['foo', 'bar', 'baz']) { - await sleep(50); - yield value; - } - }); - const d = Duplex.from(async function(asyncGenerator) { - while (!(await asyncGenerator.next()).done); - }); - - setTimeout(() => r.destroy(), 75); - - pipeline( - r, - d, - common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - assert.strictEqual(r.destroyed, true); - assert.strictEqual(d.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) { - await asyncGenerator.throw(new Error('my error')); - }), - common.mustCall((err) => { - assert.strictEqual(err.message, 'my error'); - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo', 'bar']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) { - await asyncGenerator.next(); - await asyncGenerator.throw(new Error('my error')); - }), - common.mustCall((err) => { - assert.strictEqual(err.message, 'my error'); - assert.strictEqual(r.destroyed, true); - }) - ); -} - -{ - const r = Readable.from(['foo', 'bar']); - pipeline( - r, - Duplex.from(async function(asyncGenerator) { - await asyncGenerator.next(); - await asyncGenerator.throw(); - }), - common.mustCall((err) => { - assert.strictEqual(err.code, 'ABORT_ERR'); - assert.strictEqual(r.destroyed, true); - }) - ); -} From 175b3aa179d4ea9210ca3dcad40f68dce977ed6d Mon Sep 17 00:00:00 2001 From: jakecastelli Date: Tue, 17 Dec 2024 15:20:54 +1030 Subject: [PATCH 2/2] test: add coverage for pipeline co-authored-by: jazelly --- test/parallel/test-stream-pipeline.js | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 2ee323a934606e..2bbdabe9d347b1 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1723,3 +1723,30 @@ tmpdir.refresh(); }); src.destroy(new Error('problem')); } + +{ + async function* myAsyncGenerator(ag) { + for await (const data of ag) { + yield data; + } + } + + const duplexStream = Duplex.from(myAsyncGenerator); + + const r = new Readable({ + read() { + this.push('data1\n'); + throw new Error('booom'); + }, + }); + + const w = new Writable({ + write(chunk, encoding, callback) { + callback(); + }, + }); + + pipeline(r, duplexStream, w, common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('booom')); + })); +}