Skip to content

Commit

Permalink
#9 start the worker/pool class
Browse files Browse the repository at this point in the history
  • Loading branch information
ichiriac committed Dec 22, 2016
1 parent 592aa03 commit 9550752
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
2 changes: 2 additions & 0 deletions bin/worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env node
'use strict';
108 changes: 108 additions & 0 deletions src/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*!
* Copyright (C) 2016 Glayzzle (BSD3 License)
* @authors https://github.com/glayzzle/php-reflection/graphs/contributors
* @url http://glayzzle.com
*/
var cp = require('child_process');
var numCPUs = require('os').cpus().length;
var workers = [];

/**
* @constructor
*/
var Worker = function worker(script) {
this.jobs = {};
this.length = 0;
this.script = script;
this.ready = true;
this.restart();
};

/**
* Worker is down and may be restarted
*/
Worker.prototype.restart = function(code) {
if (this.ready) {
this.ready = false;
if (this.child) {
this.child.removeAllListeners('error');
this.child.removeAllListeners('close');
this.child.removeAllListeners('exit');
// kill it (in case of hanging)
this.child.kill('SIGKILL');
}
this.child = cp.fork(this.script);
this.child.on('error', this.restart.bind(this));
this.child.on('close', this.restart.bind(this));
this.child.on('exit', this.restart.bind(this));
this.child.on('message', this.message.bind(this));
this.ready = true;

// retry pending scripts
if (this.length > 0) {
for(var k in this.jobs) {
this.child.send({
filename: k,
crc32: this.jobs[k].crc32
});
}
}
}
};

/**
* Receives the result
*/
Worker.prototype.message = function(data) {
var filename = data.filename;
if (this.jobs.hasOwnProperty(filename)) {
var promise = this.jobs[filename];
delete this.jobs[filename];
this.length--;
if (data.error) {
promise.reject(data.error);
} else {
promise.done(data);
}
}
};

/**
* Runs a process
*/
Worker.prototype.process = function(filename, crc32) {
if (!this.jobs.hasOwnProperty(filename)) {
this.length ++;
var self = this;
this.jobs[filename] = new Promise(function(done, reject) {
self.jobs[filename].done = done;
self.jobs[filename].reject = reject;
self.jobs[filename].crc32 = crc32;
self.child.send({
filename: filename,
crc32: crc32
});
});
}
return this.jobs[filename];
};

for(var i = 0; i < numCPUs; i++) {
workers.push(new Worker(__dirname + '/../bin/worker'));
}

/**
* Launch the parsing of the specified file
* @param {String} filename
* @param {String} crc32
* @return {Promise}
*/
module.exports = function(filename, crc32) {
var w = workers[0];
for(var i = 1; i < workers.length; i++) {
if (workers[i].ready && workers[i].length < w.length) {
w = workers[i];
}
}
return w.process(filename, crc32);
};

0 comments on commit 9550752

Please sign in to comment.