Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing tests for the entire library #4

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/node.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ jobs:
- run: npm ci
- run: npm run build --if-present
- run: npm test
- run: npm run make-badges
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ npm-debug.log*
yarn-debug.log*
yarn-error.log*
# Ignore compiled TypeScript files
dist/
dist/
# Ignore coverage files
coverage/
3 changes: 2 additions & 1 deletion .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ yarn-debug.log*
yarn-error.log*
#dev folders
.github/
tests/
lib/tests/
# Ignore coverage files
coverage/
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# workers-pool
Creating truly asynchronus functions has never been easier!

Creating truly asynchronus functions has never been easier!

![npm](https://img.shields.io/npm/dt/workers-pool)
![NPM](https://img.shields.io/npm/l/workers-pool)
![npm](https://img.shields.io/npm/v/workers-pool)

![Coverage](https://img.shields.io/badge/lines-40.25%25-red.svg)

The `workers-pool` package allows you to easily create a pool of workers, pass them
some heavy tasks in the form of functions, and use the generated async function as
asynchronous Promise-based functions.
Expand Down
16 changes: 16 additions & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/** @type {import('@ts-jest/dist/types').InitialOptionsTsJest} */
module.exports = {
collectCoverage: true,
detectOpenHandles: true,
coverageReporters: ["json-summary", "text"],
roots: [
"./lib/tests"
],
testMatch: [
"**/__tests__/**/*.+(ts|tsx|js)",
"**/?(*.)+(spec|test).+(ts|tsx|js)"
],
transform: {
"^.+\\.(ts|tsx)$": "ts-jest"
},
}
63 changes: 35 additions & 28 deletions lib/Pool.ts → lib/src/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import path from 'path';
import getCallerFile from 'get-caller-file';
import { isMainThread } from 'worker_threads';

import { Task } from "./task";
import { Task } from "./Task";
import { TaskWorker } from "./TaskWorker";
import { genetateScript } from './ScriptGenerator';

Expand All @@ -19,15 +19,13 @@ interface TaskRunner {
functionName?: string;
filePath?: string;
threadCount?: number;
lockToThreads?: boolean;
static?: boolean;
}

interface WorkersPoolOptions {
taskRunners?: Array<TaskRunner>; // An array of all the taskRunners for the pool
totalThreadCount?: number; // The total number of threads wanted
lockTaskRunnersToThreads?: boolean; // Whether or not to have dedicated threads for the taskRunners
allowDynamicTaskRunnerAddition?: boolean; // Whether or not to allow adding more task runners
threadCount: number;
threadCount?: number;
}

export class Pool{
Expand All @@ -49,7 +47,7 @@ export class Pool{
* @param {number} n The number of threads (default is the number of cpu cores - 1)
* @param {WorkersPoolOptions} options The optional options used in creating workers
*/
constructor(options: WorkersPoolOptions){
constructor(options?: WorkersPoolOptions){
if (isMainThread) {
this.workersPool = new Map(); // contains the idle workers
this.busyWorkers = new Map(); // contains the busy workers (processing code)
Expand All @@ -59,7 +57,7 @@ export class Pool{
this.dynamicTaskRunnerList = new Array();
this.busyWorkersCount = 0;

this.options = options;
this.options = options?options:{};
this.processingInterval = null;
this.intervalLength = 1;
this.staticTaskRunnerThreadCount = 0;
Expand All @@ -84,13 +82,13 @@ export class Pool{
let functionName = this.options.taskRunners[i].job.name;
let name = this.options.taskRunners[i].name;
let threadCount = this.options.taskRunners[i].threadCount;
let lockToThreads = this.options.lockTaskRunnersToThreads;
let lockToThreads = this.options.taskRunners[i].static;
totalStaticThreads += threadCount;
this._addTaskRunner({name, threadCount, lockToThreads, filePath, functionName});
}

// Make all others dynamic
for (let k = 0; k < (this.options.totalThreadCount - totalStaticThreads); k++) {
for (let k = 0; k < (this.options.threadCount - totalStaticThreads); k++) {
let _worker = new TaskWorker(genetateScript(DYNAMIC), {eval: true});
_worker.busy = false;
_worker.id = i;
Expand All @@ -109,19 +107,25 @@ export class Pool{
let threadCountOfTaskRunners = 0;

if (this.options.taskRunners) {
this.options.taskRunners.map((taskRunner) => {
// Static task runners
this.options.taskRunners.filter(tr => tr.static).map((taskRunner) => {
if (!taskRunner.name) {
throw new Error("Every task runner should have a name");
throw new Error("Every task runner should have a name!");
}

if (!taskRunner.threadCount) {
taskRunner.threadCount = Math.floor(this.options.totalThreadCount/this.options.taskRunners.length);
console.warn(`The task ${taskRunner.name} has no thread count specified;
therefore, ${taskRunner.threadCount} is assigned to it`)
throw new Error(`The task runner ${taskRunner.name} has no threadCount specified`);
}

threadCountOfTaskRunners += taskRunner.threadCount;
});
});

// Dynamic task runners
this.options.taskRunners.filter(tr => !tr.static).map((taskRunner) => {
if (!taskRunner.name) {
throw new Error("Every task runner should have a name!");
}
});
}

if (threadCountOfTaskRunners > this.options.threadCount) {
Expand All @@ -132,16 +136,19 @@ export class Pool{
this.options.threadCount = threadCountOfTaskRunners;
}

if (this.options.threadCount < 1) {
throw new Error('threadCount cannot be less than 1');
}


if (!this.options.threadCount) {
this.options.threadCount = CPU_CORES_NO - 1;
if (CPU_CORES_NO < 1) {
console.warn('Could not read the number of cpu cores!');
} else {
this.options.threadCount = CPU_CORES_NO - 1;
}
}

if (!this.options.lockTaskRunnersToThreads) {
this.options.lockTaskRunnersToThreads = true;
if (this.options.threadCount < 1) {
throw new Error(`threadCount cannot be less than 1. Normally if not specified the cpu cores - 1
is used, but it seems like either you have only 1 core or we could not read the
number of cores you have`);
}

if (!this.options.allowDynamicTaskRunnerAddition) {
Expand All @@ -155,14 +162,14 @@ export class Pool{
*/
private _addTaskRunner({name, threadCount, lockToThreads, filePath, functionName}) {
if (lockToThreads) {
if (!threadCount || threadCount > this.options.totalThreadCount - this.staticTaskRunnerThreadCount) {
if (!threadCount || threadCount > this.options.threadCount - this.staticTaskRunnerThreadCount) {
if (this.dynamicTaskRunnerList.length > 0) {
threadCount = this.options.totalThreadCount - this.staticTaskRunnerThreadCount - 1;
threadCount = this.options.threadCount - this.staticTaskRunnerThreadCount - 1;

if (threadCount === 0)
throw new Error('There are no enough free threads');
} else {
threadCount = this.options.totalThreadCount - this.staticTaskRunnerThreadCount;
threadCount = this.options.threadCount - this.staticTaskRunnerThreadCount;
}

this.staticTaskRunnerThreadCount += threadCount;
Expand Down Expand Up @@ -192,7 +199,7 @@ export class Pool{
this.workersPool.get(name).push(_worker);
}
} else {
if (this.staticTaskRunnerThreadCount === this.options.totalThreadCount) {
if (this.staticTaskRunnerThreadCount === this.options.threadCount) {
throw new Error('There are no enough free threads');
}

Expand All @@ -205,7 +212,7 @@ export class Pool{
* @param taskRunner
*/
public addTaskRunner(taskRunner: TaskRunner) {
let {name, job, threadCount, lockToThreads} = taskRunner;
let {name, job, threadCount, static: lockToThreads} = taskRunner;
let filePath = getCallerFile();
let functionName = job.name;

Expand Down Expand Up @@ -269,7 +276,7 @@ export class Pool{
this.stopProcessing();
} else {
for (let task of this.taskQueue) {
if (this.busyWorkersCount !== this.options.totalThreadCount) {
if (this.busyWorkersCount !== this.options.threadCount) {
// remove a free worker from the beginings of the array
if (!this.workersPool.get(task.taskRunnerName)) {
let taskRunnerInfo = this.dynamicTaskRunnerList.find(dynamicTaskRunner => dynamicTaskRunner.name === task.taskRunnerName);
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion lib/Task.ts → lib/src/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ export class Task {
this.params = params;
this.resolveCallback = resolveCallback;
this.rejectCallback = rejectCallback;
this.key = counter ? counter++ : 0;
this.key = counter++;
}
}
2 changes: 1 addition & 1 deletion lib/TaskWorker.ts → lib/src/TaskWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class TaskWorker extends Worker {
if (response.type == 'success')
this.resolveCallback({task: this.task, worker: this, result: response.value});
else if (response.type == 'error')
this.rejectCallback({task: this.task, worker: this, result: response.value});
this.rejectCallback({task: this.task, worker: this, error: response.value});

this.clenUp();
});
Expand Down
32 changes: 32 additions & 0 deletions lib/tests/Pool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import {Pool} from '../src/Pool';
import isMainThread from 'worker_threads';

test('Instantiating pool object', ()=>{
let testPool = new Pool();
expect(testPool).toBeInstanceOf(Pool);
testPool.terminate(true);
});

test('Error if a task runner have no name', () => {
// Some function to be made asynchronous
function add (a, b) {
return a + b;
}

function sub (a, b) {
return a - b;
}

module.exports.add = add;
module.exports.sub = sub;

const testPool = new Pool({
taskRunners: [
{name: 'add', job: add, threadCount: 4, static: true},
{name: 'sub', job: sub, threadCount: 4, static: true},
],
threadCount: 8,
});

testPool.terminate(true);
});
74 changes: 74 additions & 0 deletions lib/tests/ScriptGenerator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { genetateScript } from '../src/ScriptGenerator';

test('generateScript - dynamic script', () => {
let expectedDynamicScript, actualDynamicScript;

expectedDynamicScript = `
const {parentPort} = require('worker_threads');

parentPort.on('message', async (args) => {
// Require and call the function for this specific task
var response = {'type': 'success', 'value': undefined};
try {
response.value = require(args.filePath)[args.functionName](...args.params);

// If the result is a Promise resolve it
if ( Promise.resolve(response.value) == response.value) {
try{
response.value = await response.value;
} catch(error){
response.type = 'error';
response.value = error;
}
}
} catch (error) {
response.type = 'error';
response.value = error;
}

// Send the results back
parentPort.postMessage(response);
});
`.replace(/\s+|\r?\n|\r/g, '');

actualDynamicScript = genetateScript('dynamic').replace(/\s+|\r?\n|\r/g, '');

expect(actualDynamicScript).toBe(expectedDynamicScript);
});

test('generateScript - dynamic script', () => {
let expectedDynamicScript, actualDynamicScript;

expectedDynamicScript = `
const {parentPort} = require('worker_threads');
const processingFunction = require("D:\\a\\b\\c")["abc"];

parentPort.on('message', async (args) => {
// Require and call the function for this specific task
var response = {'type': 'success', 'value': undefined};
try {
response.value = processingFunction(...args.params);

// If the result is a Promise resolve it
if ( Promise.resolve(response.value) == response.value) {
try{
response.value = await response.value;
} catch(error){
response.type = 'error';
response.value = error;
}
}
} catch (error) {
response.type = 'error';
response.value = error;
}

// Send the results back
parentPort.postMessage(response);
});
`.replace(/\s+|\r?\n|\r/g, '');

actualDynamicScript = genetateScript('static', 'D:\\a\\b\\c', 'abc').replace(/\s+|\r?\n|\r/g, '');

expect(actualDynamicScript).toBe(expectedDynamicScript);
});
18 changes: 18 additions & 0 deletions lib/tests/Task.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Task } from '../src/Task';

test('Task constructor', () => {
let resolveCallback = () => 'hello';
let rejectCallback = () => 'bye';
let testTask = new Task('TR', [1, 'a', {A: 1, B: '1'}, true], resolveCallback, rejectCallback, 'FCN', 'D:\\a\\b\\c');
let testTask2 = new Task('TR', [], () => {}, () => {}, 'FCN', 'D:\\a\\b\\c');

expect(testTask).toBeInstanceOf(Task);
expect(testTask.taskRunnerName).toBe('TR');
expect(testTask.params).toEqual([1, 'a', {A: 1, B: '1'}, true]);
expect(JSON.stringify(testTask.resolveCallback)).toBe(JSON.stringify(resolveCallback));
expect(JSON.stringify(testTask.rejectCallback)).toBe(JSON.stringify(rejectCallback));
expect(testTask.functionName).toBe('FCN');
expect(testTask.filePath).toBe('D:\\a\\b\\c');
expect(testTask.key).toBe(0);
expect(testTask2.key).toBe(1);
});
Loading