From 3b696b64e6a6a38f3cd7c6e1748d1156117fb899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ari=20Perkki=C3=B6?= Date: Tue, 15 Oct 2024 18:10:54 +0300 Subject: [PATCH] fix(child_process): pipe `stdout` and `stderr` to main thread --- src/index.ts | 2 ++ src/runtime/process-worker.ts | 19 ++++++++++++ test/fixtures/stdio.mjs | 4 +++ test/worker-stdio.test.ts | 56 +++++++++++++++++++++++++++++++++++ 4 files changed, 81 insertions(+) create mode 100644 test/fixtures/stdio.mjs create mode 100644 test/worker-stdio.test.ts diff --git a/src/index.ts b/src/index.ts index 2309ed8..38d4862 100644 --- a/src/index.ts +++ b/src/index.ts @@ -780,6 +780,7 @@ class ThreadPool { }) worker.on('error', (err: Error) => { + console.log('Worker errored', err) // Work around the bug in https://github.com/nodejs/node/pull/33394 worker.ref = () => {} @@ -807,6 +808,7 @@ class ThreadPool { } else { this.publicInterface.emit('error', err) } + this.publicInterface.emit('error', err) }) worker.unref() diff --git a/src/runtime/process-worker.ts b/src/runtime/process-worker.ts index eadc2cb..72d25e9 100644 --- a/src/runtime/process-worker.ts +++ b/src/runtime/process-worker.ts @@ -26,12 +26,16 @@ export default class ProcessWorker implements TinypoolWorker { options.argv, { ...options, + stdio: 'pipe', env: { ...options.env, TINYPOOL_WORKER_ID: options.workerData[0].workerId.toString(), }, } ) + this.process.stdout!.pipe(process.stdout) + this.process.stderr!.pipe(process.stderr) + this.threadId = this.process.pid! this.process.on('exit', this.onUnexpectedExit) @@ -136,6 +140,21 @@ export default class ProcessWorker implements TinypoolWorker { // This requires manual unreffing of its channel. this.process.channel?.unref() + if (hasUnref(this.process.stdout)) { + this.process.stdout.unref() + } + + if (hasUnref(this.process.stderr)) { + this.process.stderr.unref() + } + return this.process.unref() } } + +// unref is untyped for some reason +function hasUnref(stream: null | object): stream is { unref: () => void } { + return ( + stream != null && 'unref' in stream && typeof stream.unref === 'function' + ) +} diff --git a/test/fixtures/stdio.mjs b/test/fixtures/stdio.mjs new file mode 100644 index 0000000..96e4206 --- /dev/null +++ b/test/fixtures/stdio.mjs @@ -0,0 +1,4 @@ +export default function run() { + process.stdout.write('Worker message') + process.stderr.write('Worker error') +} diff --git a/test/worker-stdio.test.ts b/test/worker-stdio.test.ts new file mode 100644 index 0000000..d39c777 --- /dev/null +++ b/test/worker-stdio.test.ts @@ -0,0 +1,56 @@ +import * as path from 'node:path' +import { fileURLToPath } from 'node:url' +import { stripVTControlCharacters } from 'node:util' +import { Tinypool } from 'tinypool' + +const runtimes = ['worker_threads', 'child_process'] as const +const __dirname = path.dirname(fileURLToPath(import.meta.url)) + +test.each(runtimes)( + "worker's stdout and stderr are piped to main thread when { runtime: '%s' }", + async (runtime) => { + const pool = createPool({ + runtime, + minThreads: 1, + maxThreads: 1, + }) + + const getStdout = captureStandardStream('stdout') + const getStderr = captureStandardStream('stderr') + + await pool.run({}) + + const stdout = getStdout() + const stderr = getStderr() + + expect(stdout).toBe('Worker message') + + expect(stderr).toBe('Worker error') + } +) + +function createPool(options: Partial) { + const pool = new Tinypool({ + filename: path.resolve(__dirname, 'fixtures/stdio.mjs'), + minThreads: 1, + maxThreads: 1, + ...options, + }) + + return pool +} + +function captureStandardStream(type: 'stdout' | 'stderr') { + const spy = vi.fn() + + // eslint-disable-next-line @typescript-eslint/unbound-method + const original = process[type].write + process[type].write = spy + + return function collect() { + process[type].write = original + return stripVTControlCharacters( + spy.mock.calls.map((call) => call[0]).join('') + ) + } +}