From d93f30cd587a2204cd84f3d2837fa60a4a8d2748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petter=20H=C3=A4ggholm?= Date: Tue, 4 Feb 2020 16:43:53 -0800 Subject: [PATCH 1/2] feat: allow overriding default spawn init timeout via parameter rather than environment variable --- src/master/spawn.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/master/spawn.ts b/src/master/spawn.ts index 11d5dfa2..ddb89335 100644 --- a/src/master/spawn.ts +++ b/src/master/spawn.ts @@ -137,13 +137,16 @@ function setPrivateThreadProps(raw: T, worker: WorkerType, workerEvents: Obse * the worker has initialized successfully. * * @param worker Instance of `Worker`. Either a web worker, `worker_threads` worker or `tiny-worker` worker. + * @param [options] + * @param [options.timeout] Init message timeout. Default: 10000 or set by environment variable. */ export async function spawn = ArbitraryWorkerInterface>( - worker: WorkerType + worker: WorkerType, + options?: { timeout?: number } ): Promise> { debugSpawn("Initializing new thread") - const initMessage = await withTimeout(receiveInitMessage(worker), initMessageTimeout, `Timeout: Did not receive an init message from worker after ${initMessageTimeout}ms. Make sure the worker calls expose().`) + const initMessage = await withTimeout(receiveInitMessage(worker), options && options.timeout ? options.timeout : initMessageTimeout, `Timeout: Did not receive an init message from worker after ${initMessageTimeout}ms. Make sure the worker calls expose().`) const exposed = initMessage.exposed const { termination, terminate } = createTerminator(worker) From 9a851e5619ed364a5c7eb4bcfc7b5db34d692f50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petter=20H=C3=A4ggholm?= Date: Mon, 20 Apr 2020 17:37:51 -0700 Subject: [PATCH 2/2] feat: dynamic pools: free unused workers after a configurable interval --- src/master/pool-types.ts | 1 + src/master/pool.ts | 74 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/src/master/pool-types.ts b/src/master/pool-types.ts index 83c1f4fd..ae9ec71d 100644 --- a/src/master/pool-types.ts +++ b/src/master/pool-types.ts @@ -48,6 +48,7 @@ export type PoolEvent = { export interface WorkerDescriptor { init: Promise runningTasks: Array> + lastActivity: number } /** diff --git a/src/master/pool.ts b/src/master/pool.ts index b0a6ff48..d646fd37 100644 --- a/src/master/pool.ts +++ b/src/master/pool.ts @@ -48,10 +48,13 @@ function spawnWorkers( spawnWorker: () => Promise, count: number ): Array> { - return createArray(count).map((): WorkerDescriptor => ({ - init: spawnWorker(), - runningTasks: [] - })) + return createArray(count).map( + (): WorkerDescriptor => ({ + init: spawnWorker(), + runningTasks: [], + lastActivity: Date.now() + }) + ) } /** @@ -109,6 +112,15 @@ export interface PoolOptions { /** No. of worker threads to spawn and to be managed by the pool. */ size?: number + + /** For dynamically sized pool, the minimum number of active workers. */ + minSize?: number + + /** For dynamically sized pool, maximum idle lifetime of worker threads. */ + idleTimeoutMillis?: number + + /** For dynamically sized pool, frequency of cleaning up idle threads. */ + idleCleanupIntervalMillis?: number } class WorkerPool implements Pool { @@ -124,6 +136,11 @@ class WorkerPool implements Pool { private isClosing = false private nextTaskID = 1 private taskQueue: Array> = [] + private maxSize: number + private minSize: number + private idleTimeoutMillis?: number + private spawnWorker: () => Promise + private idleCleanupHandle: NodeJS.Timeout | undefined constructor( spawnWorker: () => Promise, @@ -134,13 +151,26 @@ class WorkerPool implements Pool { : optionsOrSize || {} const { size = defaultPoolSize } = options + this.maxSize = size + this.minSize = options.minSize ?? 0 + this.idleTimeoutMillis = options.idleTimeoutMillis + this.spawnWorker = spawnWorker this.debug = DebugLogger(`threads:pool:${slugify(options.name || String(nextPoolID++))}`) this.options = options - this.workers = spawnWorkers(spawnWorker, size) + + const initialWorkers = this.idleTimeoutMillis ? this.minSize : size + this.workers = spawnWorkers(spawnWorker, initialWorkers) this.eventObservable = multicast(Observable.from(this.eventSubject)) + if (this.idleTimeoutMillis) { + this.idleCleanupHandle = setInterval( + () => this.purgeExpiredWorkers(), + options.idleCleanupIntervalMillis ?? 30 * 1000 + ) + } + Promise.all(this.workers.map(worker => worker.init)).then( () => this.eventSubject.next({ type: PoolEventType.initialized, @@ -191,6 +221,8 @@ class WorkerPool implements Pool { } private async run(worker: WorkerDescriptor, task: QueuedTask) { + worker.lastActivity = Date.now() + const runPromise = (async () => { const removeTaskFromWorkersRunningTasks = () => { worker.runningTasks = worker.runningTasks.filter(someRunPromise => someRunPromise !== runPromise) @@ -202,6 +234,7 @@ class WorkerPool implements Pool { try { await this.runPoolTask(worker, task) } finally { + worker.lastActivity = Date.now() removeTaskFromWorkersRunningTasks() if (!this.isClosing) { @@ -213,12 +246,35 @@ class WorkerPool implements Pool { worker.runningTasks.push(runPromise) } + private async purgeExpiredWorkers() { + const idleWorkers: WorkerDescriptor[] = this.workers.filter( + w => w.lastActivity < Date.now() - this.idleTimeoutMillis! + ) + const rmWorkers: WorkerDescriptor[] = [] + while (idleWorkers.length && this.workers.length > this.minSize) { + const rmWorker = idleWorkers.shift()! + rmWorkers.push(rmWorker) + const idx = this.workers.findIndex(w => w === rmWorker) + this.workers.splice(idx, 1) + } + + Promise.all(rmWorkers.map( + async (w) => Thread.terminate(await w.init) + )) + } private scheduleWork() { this.debug(`Attempt de-queueing a task in order to run it...`) - const availableWorker = this.findIdlingWorker() - if (!availableWorker) return + let availableWorker = this.findIdlingWorker() + if (!availableWorker) { + if (this.workers.length < this.maxSize) { + [availableWorker] = spawnWorkers(this.spawnWorker, 1) + this.workers.push(availableWorker) + } else { + return + } + } const nextTask = this.taskQueue.shift() if (!nextTask) { @@ -371,6 +427,10 @@ class WorkerPool implements Pool { public async terminate(force?: boolean) { this.isClosing = true + if (this.idleCleanupHandle) { + clearInterval(this.idleCleanupHandle!) + } + if (!force) { await this.completed(true) }