Skip to content

Commit dc6be1e

Browse files
committed
feat: worker_threads
- fix bug for when there was multiple workers defined in the same file - added a new function that behave like an async function but run in parallel - added a new helper to execute a function in background
1 parent 83d2347 commit dc6be1e

File tree

5 files changed

+97
-32
lines changed

5 files changed

+97
-32
lines changed

stdlib/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
export { Schema } from '@effect/schema'
22

3+
export * from './lib/execParallel.ts'
4+
export * from './lib/executionOrder.ts'
35
export * from './lib/pubSub.ts'
46
export * from './lib/queue.ts'
57
export * from './lib/test.ts'

stdlib/lib/execParallel.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { fileURLToPath } from 'node:url'
2+
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads'
3+
import { getExecutionId } from './executionOrder.ts'
4+
import { fork, resolvablePromise } from './utils.ts'
5+
6+
export function execParallel<Props, ReturnType>(filePath: string, fn: (props: Props) => ReturnType) {
7+
const { executionId } = getExecutionId(filePath)
8+
9+
if (isMainThread && process.env['execParallelId'] === executionId) {
10+
fork(async () => {
11+
const res = await fn(workerData)
12+
parentPort?.postMessage(res)
13+
14+
process.exit(0)
15+
})
16+
}
17+
18+
return (props: Props) => {
19+
const worker = new Worker(fileURLToPath(filePath), {
20+
workerData: props,
21+
env: { ...process.env, execParallelId: executionId }
22+
})
23+
24+
const { promise, resolvePromise } = resolvablePromise<ReturnType>()
25+
26+
worker.on('message', data => {
27+
resolvePromise(data)
28+
})
29+
30+
return promise
31+
}
32+
}

stdlib/lib/executionOrder.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
const executionMap = new Map<string, number>()
2+
3+
/**
4+
* Return an Id that it's consistent for every execution
5+
* It's based on the order of execution
6+
*/
7+
export const getExecutionId = (fileName: string) => {
8+
executionMap.get(fileName) ?? executionMap.set(fileName, 0)
9+
10+
const index = executionMap.get(fileName) ?? 0
11+
executionMap.set(fileName, index + 1)
12+
13+
return { executionId: index.toString() }
14+
}

stdlib/lib/utils.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ export function resolvablePromise<T>() {
3030
return { resolvePromise, promise }
3131
}
3232

33+
export async function fork<ReturnType extends Promise<any> | any>(fn: (index: number) => ReturnType, instances = 1) {
34+
const promises: ReturnType[] = []
35+
36+
for (let i = 0; i < instances; i++) promises.push(fn(i))
37+
38+
return await Promise.all(promises)
39+
}
40+
3341
export declare namespace random {
3442
type Props = { min: number; max: number; step: number }
3543
}

stdlib/lib/workerThreads.ts

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { fileURLToPath } from 'node:url'
22
import { Worker, isMainThread, parentPort, workerData as rawWorkerData } from 'node:worker_threads'
33
import { Schema } from '@effect/schema'
4+
import { getExecutionId } from './executionOrder.ts'
45
import { Queue } from './queue.ts'
56

67
export declare namespace workerThreads {
@@ -25,49 +26,57 @@ export declare namespace workerThreads {
2526
}
2627

2728
export async function workerThreads<Schema extends workerThreads.Schema>(props: workerThreads.Props<Schema>) {
28-
if (isMainThread) {
29-
const spawnWorker = (workerData: Schema['init']['Encoded']) => {
30-
const worker = new Worker(fileURLToPath(props.filePath), { workerData })
29+
const { executionId } = getExecutionId(props.filePath)
3130

32-
const queue = new Queue<Schema['main']['Type']>()
33-
worker.on('message', rawData => {
34-
const data = Schema.decodeSync(props.schema.main)(rawData)
35-
void queue.publish(data)
36-
})
31+
if (isMainThread && process.env['workerThreadsId'] === executionId) {
32+
const worker = parentPort!
33+
const workerData = Schema.decodeSync(props.schema.init)(rawWorkerData)
3734

38-
worker.on('exit', () => queue.close())
39-
worker.on('error', () => queue.close())
40-
worker.on('messageerror', () => queue.close())
35+
const queue = new Queue<Schema['worker']['Type']>()
36+
worker.on('message', rawData => {
37+
const data = Schema.decodeSync(props.schema.worker)(rawData)
38+
void queue.publish(data)
39+
})
4140

42-
const send = async (rawData: Schema['worker']['Encoded']) => {
43-
const data = await Schema.encodePromise(props.schema.worker)(rawData)
44-
worker.postMessage(data)
45-
}
41+
worker.on('exit', () => queue.close())
42+
worker.on('error', () => queue.close())
43+
worker.on('messageerror', () => queue.close())
4644

47-
return { queue, send, terminate: worker.terminate }
45+
const send = async (rawData: Schema['main']['Encoded']) => {
46+
const data = await Schema.encodePromise(props.schema.main)(rawData)
47+
worker.postMessage(data)
4848
}
4949

50-
return { spawnWorker }
50+
await props.worker({ queue, send, workerData, terminate: process.exit })
51+
process.exit(0)
5152
}
5253

53-
const worker = parentPort!
54-
const workerData = Schema.decodeSync(props.schema.init)(rawWorkerData)
54+
function spawnWorker(workerData: Schema['init']['Encoded']) {
55+
const worker = new Worker(fileURLToPath(props.filePath), {
56+
workerData,
57+
env: {
58+
...process.env,
59+
workerThreadsId: executionId
60+
}
61+
})
62+
63+
const queue = new Queue<Schema['main']['Type']>()
64+
worker.on('message', rawData => {
65+
const data = Schema.decodeSync(props.schema.main)(rawData)
66+
void queue.publish(data)
67+
})
5568

56-
const queue = new Queue<Schema['worker']['Type']>()
57-
worker.on('message', rawData => {
58-
const data = Schema.decodeSync(props.schema.worker)(rawData)
59-
void queue.publish(data)
60-
})
69+
worker.on('exit', () => queue.close())
70+
worker.on('error', () => queue.close())
71+
worker.on('messageerror', () => queue.close())
6172

62-
worker.on('exit', () => queue.close())
63-
worker.on('error', () => queue.close())
64-
worker.on('messageerror', () => queue.close())
73+
const send = async (rawData: Schema['worker']['Encoded']) => {
74+
const data = await Schema.encodePromise(props.schema.worker)(rawData)
75+
worker.postMessage(data)
76+
}
6577

66-
const send = async (rawData: Schema['main']['Encoded']) => {
67-
const data = await Schema.encodePromise(props.schema.main)(rawData)
68-
worker.postMessage(data)
78+
return { queue, send, terminate: worker.terminate }
6979
}
7080

71-
await props.worker({ queue, send, workerData, terminate: process.exit })
72-
process.exit(0)
81+
return { spawnWorker }
7382
}

0 commit comments

Comments
 (0)