Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent hang when process is overwritten #83

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/entry/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ process.__tinypool_state__ = {
workerId: Number(process.env.TINYPOOL_WORKER_ID),
}

const memoryUsage = process.memoryUsage.bind(process)
const send = process.send!.bind(process)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this stabilizes the process and keeps the same reference, correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will preserve the original process methods in the module scope. The file that is loaded later may overwrite globals:

if (filename !== null) {
await getHandler(filename, name)

So basically:

process.on("message", async (msg) => {
  const originalSend = process.send.bind(process);

  await import(msg.filename); // Contains globalThis.process = Mock;

  process.send(response);
  //      ^^^^ send is not a function

  originalSend(response);
  // ^^ works
})

birpc has similar implementation: https://github.com/antfu/birpc/blob/72510536cedb301953e0550d884667804d7268d0/src/index.ts#L145-L147


process.on('message', (message: IncomingMessage) => {
// Message was not for port or pool
// It's likely end-users own communication between main and worker
Expand All @@ -36,7 +39,7 @@ process.on('message', (message: IncomingMessage) => {
await getHandler(filename, name)
}

process.send!(<OutgoingMessage>{
send!(<OutgoingMessage>{
ready: true,
source: 'pool',
__tinypool_worker_message__: true,
Expand Down Expand Up @@ -69,7 +72,7 @@ async function onMessage(message: IncomingMessage & { source: 'port' }) {
taskId,
result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
usedMemory: memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
Expand All @@ -89,11 +92,11 @@ async function onMessage(message: IncomingMessage & { source: 'port' }) {
taskId,
result: null,
error: serializeError(error),
usedMemory: process.memoryUsage().heapUsed,
usedMemory: memoryUsage().heapUsed,
}
}

process.send!(response)
send!(response)
}

function serializeError(error: unknown) {
Expand Down
5 changes: 3 additions & 2 deletions src/entry/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ process.__tinypool_state__ = {
workerId: tinypoolPrivateData.workerId,
}

const memoryUsage = process.memoryUsage.bind(process)
let useAtomics: boolean = process.env.PISCINA_DISABLE_ATOMICS !== '1'

// We should only receive this message once, when the Worker starts. It gives
Expand Down Expand Up @@ -110,7 +111,7 @@ function onMessage(
taskId,
result: result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
usedMemory: memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
Expand All @@ -130,7 +131,7 @@ function onMessage(
// It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error,
usedMemory: process.memoryUsage().heapUsed,
usedMemory: memoryUsage().heapUsed,
}
}
currentTasks--
Expand Down
32 changes: 32 additions & 0 deletions test/globals.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import * as path from 'path'
import { fileURLToPath } from 'url'
import { Tinypool } from 'tinypool'

const __dirname = path.dirname(fileURLToPath(import.meta.url))

describe.each(['worker_threads', 'child_process'] as const)('%s', (runtime) => {
test("doesn't hang when process is overwritten", async () => {
const pool = createPool({ runtime })

const result = await pool.run(`
(async () => {
return new Promise(resolve => {
globalThis.process = { exit: resolve };
process.exit("exit() from overwritten process");
});
})();
`)
expect(result).toBe('exit() from overwritten process')
})
})

function createPool(options: Partial<Tinypool['options']>) {
const pool = new Tinypool({
filename: path.resolve(__dirname, 'fixtures/eval.js'),
minThreads: 1,
maxThreads: 1,
...options,
})

return pool
}
Loading