Skip to content

Commit

Permalink
fix(child_process): pipe stdout and stderr to main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio committed Oct 15, 2024
1 parent 1f5c56e commit bb88462
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/runtime/process-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ export default class ProcessWorker implements TinypoolWorker {
options.argv,
{
...options,
stdio: 'pipe',
env: {
...options.env,
TINYPOOL_WORKER_ID: options.workerData[0].workerId.toString(),
},
}
)
this.process.stdout!.pipe(process.stdout)
this.process.stderr!.pipe(process.stderr)

this.threadId = this.process.pid!

this.process.on('exit', this.onUnexpectedExit)
Expand Down Expand Up @@ -136,6 +140,21 @@ export default class ProcessWorker implements TinypoolWorker {
// This requires manual unreffing of its channel.
this.process.channel?.unref()

if (hasUnref(this.process.stdout)) {
this.process.stdout.unref()
}

if (hasUnref(this.process.stderr)) {
this.process.stderr.unref()
}

return this.process.unref()
}
}

// unref is untyped for some reason
function hasUnref(stream: null | object): stream is { unref: () => void } {
return (
stream != null && 'unref' in stream && typeof stream.unref === 'function'
)
}
4 changes: 4 additions & 0 deletions test/fixtures/stdio.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export default function run() {
process.stdout.write('Worker message')
process.stderr.write('Worker error')
}
56 changes: 56 additions & 0 deletions test/worker-stdio.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import * as path from 'node:path'
import { fileURLToPath } from 'node:url'
import { stripVTControlCharacters } from 'node:util'
import { Tinypool } from 'tinypool'

const runtimes = ['worker_threads', 'child_process'] as const
const __dirname = path.dirname(fileURLToPath(import.meta.url))

test.each(runtimes)(
"worker's stdout and stderr are piped to main thread when { runtime: '%s' }",
async (runtime) => {
const pool = createPool({
runtime,
minThreads: 1,
maxThreads: 1,
})

const getStdout = captureStandardStream('stdout')
const getStderr = captureStandardStream('stderr')

await pool.run({})

const stdout = getStdout()
const stderr = getStderr()

expect(stdout).toBe('Worker message')

expect(stderr).toBe('Worker error')

Check failure on line 28 in test/worker-stdio.test.ts

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest, 20.x)

test/worker-stdio.test.ts > worker's stdout and stderr are piped to main thread when { runtime: 'child_process' }

AssertionError: expected '(node:2177) MaxListenersExceededWarni…' to be 'Worker error' // Object.is equality - Expected + Received + (node:2177) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 62 unpipe listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit + (Use `node --trace-warnings ...` to show where the warning was created) + (node:2177) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 62 error listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit + (node:2177) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 62 close listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit + (node:2177) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 62 finish listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit + (node:2177) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 62 unpipe listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit + (node:2177) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 62 error listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit + (node:2177) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 62 close listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit + (node:2177) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 62 finish listeners added to [Socket]. MaxListeners is 10. Use emitter.setMaxListeners() to increase limit Worker error ❯ test/worker-stdio.test.ts:28:20
}
)

function createPool(options: Partial<Tinypool['options']>) {
const pool = new Tinypool({
filename: path.resolve(__dirname, 'fixtures/stdio.mjs'),
minThreads: 1,
maxThreads: 1,
...options,
})

return pool
}

function captureStandardStream(type: 'stdout' | 'stderr') {
const spy = vi.fn()

// eslint-disable-next-line @typescript-eslint/unbound-method
const original = process[type].write
process[type].write = spy

return function collect() {
process[type].write = original
return stripVTControlCharacters(
spy.mock.calls.map((call) => call[0]).join('')
)
}
}

0 comments on commit bb88462

Please sign in to comment.