Skip to content

Commit

Permalink
async worker initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafal Augustyniak authored and rafixer committed Nov 26, 2020
1 parent 70a526e commit 583a1df
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.3.0] - 2020-11-24

### Added

- new options `asyncWorkerInitialization` that enable async initialization of worker

## [2.2.1] - 2020-09-17

### Changed
Expand Down
12 changes: 11 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Describes a WorkerNodes options.
* [WorkerNodesOptions](#WorkerNodesOptions)
* [.autoStart](#WorkerNodesOptions+autoStart) : <code>Boolean</code>
* [.lazyStart](#WorkerNodesOptions+lazyStart) : <code>Boolean</code>
* [.asyncWorkerInitialization](#WorkerNodesOptions+asyncWorkerInitialization) : <code>Boolean</code>
* [.minWorkers](#WorkerNodesOptions+minWorkers) : <code>Number</code>
* [.maxWorkers](#WorkerNodesOptions+maxWorkers) : <code>Number</code>
* [.maxTasks](#WorkerNodesOptions+maxTasks) : <code>Number</code>
Expand All @@ -119,6 +120,15 @@ If true, depending on the [lazyStart](#WorkerNodesOptions+lazyStart) option, it
### options.lazyStart : <code>Boolean</code>
Whether should start a new worker only if all the others are busy.

**Kind**: instance property of [<code>WorkerNodesOptions</code>](#WorkerNodesOptions)
**Default**: <code>false</code>
<a name="WorkerNodesOptions+asyncWorkerInitialization"></a>

### options.asyncWorkerInitialization : <code>Boolean</code>
Enables async initialization of worker.
To start handling task over worker, need to invoke `sendWorkerMessage('ready')` function when it fully initialized.
For examples please refer to [the test cases](https://github.com/allegro/node-worker-nodes/blob/master/e2e/async-initialization.spec.js)

**Kind**: instance property of [<code>WorkerNodesOptions</code>](#WorkerNodesOptions)
**Default**: <code>false</code>
<a name="WorkerNodesOptions+minWorkers"></a>
Expand Down Expand Up @@ -199,7 +209,7 @@ const myModuleWorkerNodes = new WorkerNodes('/home/joe.doe/workspace/my-module')
myModuleWorkerNodes.call().then(msg => console.log(msg)); // -> 'hello from separate process!'
```

For more advanced examples please refer to [the test cases](https://github.com/allegro/node-worker-nodes/blob/master/tests/e2e.spec.js).
For more advanced examples please refer to [the test cases](https://github.com/allegro/node-worker-nodes/tree/master/e2e).


## Running tests
Expand Down
2 changes: 1 addition & 1 deletion docs-src/Readme.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const myModuleWorkerNodes = new WorkerNodes('/home/joe.doe/workspace/my-module')
myModuleWorkerNodes.call().then(msg => console.log(msg)); // -> 'hello from separate process!'
```

For more advanced examples please refer to [the test cases](https://github.com/allegro/node-worker-nodes/blob/master/tests/e2e.spec.js).
For more advanced examples please refer to [the test cases](https://github.com/allegro/node-worker-nodes/tree/master/e2e).


## Running tests
Expand Down
31 changes: 31 additions & 0 deletions e2e/async-initialization.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const test = require('ava');

const WorkerNodes = require('../');
const { fixture } = require('./utils');

test('should not mark worker as ready until module fully initialized', t => {
// given
const workerNodes = new WorkerNodes(fixture('async-initialization'), {
maxWorkers: 1,
asyncWorkerInitialization: true,
autoStart: true
});

// then
t.falsy(workerNodes.pickWorker());
});

test('should correctly handle task after initialization', async t => {
// given
const workerNodes = new WorkerNodes(fixture('async-initialization'), {
maxWorkers: 1,
asyncWorkerInitialization: true,
autoStart: true
});

// when
const result = await workerNodes.call.result();

// then
t.is(result, 'result');
});
2 changes: 1 addition & 1 deletion e2e/call-timeout.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ test('should result with rejection of all the calls that the worker was processi
// when

const failingCall = workerNodes.call.task500ms().catch(error => error);
await wait(150);
await wait(200);

const secondCall = workerNodes.call.task100ms().catch(error => error);
const results = await Promise.all([failingCall, secondCall]);
Expand Down
12 changes: 12 additions & 0 deletions e2e/fixtures/async-initialization.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const sendWorkerMessage = require('../../lib/util/send-worker-message');

let result = '';

setTimeout(() => {
result = 'result';
sendWorkerMessage('ready');
}, 200);

module.exports = {
result: () => result,
};
2 changes: 2 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
interface Options {
autoStart?: boolean;
lazyStart?: boolean;
asyncWorkerInitialization?: boolean;
minWorkers?: number;
maxWorkers?: number;
maxTasks?: number;
Expand All @@ -17,6 +18,7 @@ interface WorkerNodesInstance {
terminate: () => Promise<WorkerNodesInstance>;
profiler: (duration?: number) => void;
takeSnapshot: () => void;
getUsedWorkers: () => Array<Worker>;
}

interface CallProperty {
Expand Down
3 changes: 2 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
module.exports = require('./pool');
module.exports = require('./pool');
module.exports.sendWorkerMessage = require('./util/send-worker-message');
14 changes: 13 additions & 1 deletion lib/options.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class WorkerNodesOptions {
constructor({
autoStart = false,
lazyStart = false,
asyncWorkerInitialization = false,
minWorkers = 0,
maxWorkers = system.cpus().length,
maxTasks = Infinity,
Expand Down Expand Up @@ -37,6 +38,16 @@ class WorkerNodesOptions {
*/
this.lazyStart = Boolean(lazyStart);

/**
* Enables async initialization of worker.
* To start handling task over worker, need to invoke `sendWorkerMessage('ready')` function when it fully initialized.
* For examples please refer to [the test cases](https://github.com/allegro/node-worker-nodes/blob/master/e2e/async-initialization.spec.js)
*
* @type {Boolean}
* @default false
*/
this.asyncWorkerInitialization = Boolean(asyncWorkerInitialization);

/**
* The minimum number of workers that needs to be running to consider the whole pool as operational.
*
Expand Down Expand Up @@ -127,7 +138,8 @@ class WorkerNodesOptions {
srcFilePath,
maxTasks: this.maxTasksPerWorker,
endurance: this.workerEndurance,
stopTimeout: this.workerStopTimeout
stopTimeout: this.workerStopTimeout,
asyncWorkerInitialization: this.asyncWorkerInitialization
}
}
}
Expand Down
5 changes: 0 additions & 5 deletions lib/status.js

This file was deleted.

3 changes: 3 additions & 0 deletions lib/util/send-worker-message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const { parentPort } = require('worker_threads');

module.exports = (message) => parentPort.postMessage(message);
4 changes: 2 additions & 2 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const ProcessResponse = messages.Response;
const workerSerial = new Sequence(1);

class Worker extends EventEmitter {
constructor({ srcFilePath, maxTasks, endurance, stopTimeout }) {
constructor({ srcFilePath, maxTasks, endurance, stopTimeout, asyncWorkerInitialization }) {
super();

this.id = workerSerial.nextValue();
Expand All @@ -21,7 +21,7 @@ class Worker extends EventEmitter {
this.isTerminating = false;
this.isProcessAlive = false;

const process = this.process = new WorkerProcess(srcFilePath, { stopTimeout });
const process = this.process = new WorkerProcess(srcFilePath, { stopTimeout, asyncWorkerInitialization });

process.once('ready', () => {
this.isProcessAlive = true;
Expand Down
6 changes: 4 additions & 2 deletions lib/worker/child-loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ const HeapProfiler = require('../util/heap-profiler');

let $module;

function setupModule({ modulePath }) {
function setupModule({ modulePath, asyncWorkerInitialization }) {
// load target module
$module = require(modulePath);

// setup data channel
parentPort.on('message', callSplitter);

// report readiness
parentPort.postMessage('ready');
if (!asyncWorkerInitialization) {
parentPort.postMessage('ready');
}
}

function callSplitter(requestData) {
Expand Down
5 changes: 2 additions & 3 deletions lib/worker/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const worker = require('worker_threads');
const EventEmitter = require('events');

class WorkerProcess extends EventEmitter {
constructor(modulePath, { stopTimeout }) {
constructor(modulePath, { asyncWorkerInitialization }) {
super();

const child = new worker.Worker(require.resolve('./child-loader'));
Expand All @@ -20,15 +20,14 @@ class WorkerProcess extends EventEmitter {
});

this.child = child;
this.stopTimeout = stopTimeout;

child.on('error', error => console.error(error));

// this instance is not usable from this moment, so forward the exit event
child.on('exit', code => this.emit('exit', code));

// pass all the information needed to spin up the child process
child.postMessage({ cmd: 'start', data: { modulePath } });
child.postMessage({ cmd: 'start', data: { modulePath, asyncWorkerInitialization } });
}

/**
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "worker-nodes",
"version": "2.2.1",
"version": "2.3.0",
"description": "A library to run cpu-intensive tasks without blocking the event loop.",
"keywords": [
"workers",
Expand Down

0 comments on commit 583a1df

Please sign in to comment.