diff --git a/src/master/pool-types.ts b/src/master/pool-types.ts index 83c1f4fd..ae9ec71d 100644 --- a/src/master/pool-types.ts +++ b/src/master/pool-types.ts @@ -48,6 +48,7 @@ export type PoolEvent = { export interface WorkerDescriptor { init: Promise runningTasks: Array> + lastActivity: number } /** diff --git a/src/master/pool.ts b/src/master/pool.ts index b0a6ff48..d646fd37 100644 --- a/src/master/pool.ts +++ b/src/master/pool.ts @@ -48,10 +48,13 @@ function spawnWorkers( spawnWorker: () => Promise, count: number ): Array> { - return createArray(count).map((): WorkerDescriptor => ({ - init: spawnWorker(), - runningTasks: [] - })) + return createArray(count).map( + (): WorkerDescriptor => ({ + init: spawnWorker(), + runningTasks: [], + lastActivity: Date.now() + }) + ) } /** @@ -109,6 +112,15 @@ export interface PoolOptions { /** No. of worker threads to spawn and to be managed by the pool. */ size?: number + + /** For dynamically sized pool, the minimum number of active workers. */ + minSize?: number + + /** For dynamically sized pool, maximum idle lifetime of worker threads. */ + idleTimeoutMillis?: number + + /** For dynamically sized pool, frequency of cleaning up idle threads. */ + idleCleanupIntervalMillis?: number } class WorkerPool implements Pool { @@ -124,6 +136,11 @@ class WorkerPool implements Pool { private isClosing = false private nextTaskID = 1 private taskQueue: Array> = [] + private maxSize: number + private minSize: number + private idleTimeoutMillis?: number + private spawnWorker: () => Promise + private idleCleanupHandle: NodeJS.Timeout | undefined constructor( spawnWorker: () => Promise, @@ -134,13 +151,26 @@ class WorkerPool implements Pool { : optionsOrSize || {} const { size = defaultPoolSize } = options + this.maxSize = size + this.minSize = options.minSize ?? 0 + this.idleTimeoutMillis = options.idleTimeoutMillis + this.spawnWorker = spawnWorker this.debug = DebugLogger(`threads:pool:${slugify(options.name || String(nextPoolID++))}`) this.options = options - this.workers = spawnWorkers(spawnWorker, size) + + const initialWorkers = this.idleTimeoutMillis ? this.minSize : size + this.workers = spawnWorkers(spawnWorker, initialWorkers) this.eventObservable = multicast(Observable.from(this.eventSubject)) + if (this.idleTimeoutMillis) { + this.idleCleanupHandle = setInterval( + () => this.purgeExpiredWorkers(), + options.idleCleanupIntervalMillis ?? 30 * 1000 + ) + } + Promise.all(this.workers.map(worker => worker.init)).then( () => this.eventSubject.next({ type: PoolEventType.initialized, @@ -191,6 +221,8 @@ class WorkerPool implements Pool { } private async run(worker: WorkerDescriptor, task: QueuedTask) { + worker.lastActivity = Date.now() + const runPromise = (async () => { const removeTaskFromWorkersRunningTasks = () => { worker.runningTasks = worker.runningTasks.filter(someRunPromise => someRunPromise !== runPromise) @@ -202,6 +234,7 @@ class WorkerPool implements Pool { try { await this.runPoolTask(worker, task) } finally { + worker.lastActivity = Date.now() removeTaskFromWorkersRunningTasks() if (!this.isClosing) { @@ -213,12 +246,35 @@ class WorkerPool implements Pool { worker.runningTasks.push(runPromise) } + private async purgeExpiredWorkers() { + const idleWorkers: WorkerDescriptor[] = this.workers.filter( + w => w.lastActivity < Date.now() - this.idleTimeoutMillis! + ) + const rmWorkers: WorkerDescriptor[] = [] + while (idleWorkers.length && this.workers.length > this.minSize) { + const rmWorker = idleWorkers.shift()! + rmWorkers.push(rmWorker) + const idx = this.workers.findIndex(w => w === rmWorker) + this.workers.splice(idx, 1) + } + + Promise.all(rmWorkers.map( + async (w) => Thread.terminate(await w.init) + )) + } private scheduleWork() { this.debug(`Attempt de-queueing a task in order to run it...`) - const availableWorker = this.findIdlingWorker() - if (!availableWorker) return + let availableWorker = this.findIdlingWorker() + if (!availableWorker) { + if (this.workers.length < this.maxSize) { + [availableWorker] = spawnWorkers(this.spawnWorker, 1) + this.workers.push(availableWorker) + } else { + return + } + } const nextTask = this.taskQueue.shift() if (!nextTask) { @@ -371,6 +427,10 @@ class WorkerPool implements Pool { public async terminate(force?: boolean) { this.isClosing = true + if (this.idleCleanupHandle) { + clearInterval(this.idleCleanupHandle!) + } + if (!force) { await this.completed(true) }