diff --git a/README.md b/README.md index 471c5b4..41faef6 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,9 @@ Run multiple servers.js for round-robin shared. ###server.js example var rpc = require('amqp-rpc').factory({ - url: "amqp://guest:guest@localhost:5672" - }); + exchange: "uuu-test", exchange_options: {exclusive: false, autoDelete: true}, + conn_options: {url: "amqp://guest:guest@rabbitmq:5672", heartbeat: 10} + }); rpc.on('inc', function(param, cb){ @@ -58,8 +59,9 @@ Run multiple servers.js for round-robin shared. ###client.js example var rpc = require('amqp-rpc').factory({ - url: "amqp://guest:guest@localhost:5672" - }); + exchange: "uuu-test", exchange_options: {exclusive: false, autoDelete: true}, + conn_options: {url: "amqp://guest:guest@rabbitmq:5672", heartbeat: 10} + }); rpc.call('inc', 5, function() { console.log('results of inc:', arguments); //output: [6,4,7] @@ -144,3 +146,22 @@ Results for three workers: host2:2822 uptime= 2279.16 seconds counter= 3 Eugene Demchenko aka Goldy skype demchenkoe email demchenkoev@gmail.com + + + +## Changelog + +0.2.4: + + * bugfix memory leak + +0.2.3: + + * support for other exchanges than rpc_exchange + + +0.2.2: + + * print channels fn + + diff --git a/example/broadcast/core.js b/example/broadcast/core.js index 70c65ab..f3ac873 100644 --- a/example/broadcast/core.js +++ b/example/broadcast/core.js @@ -1,5 +1,5 @@ var rpc = require('../../index').factory({ - url: "amqp://guest:guest@localhost:5672" + url: "amqp://tdev1:mq4tdev1@127.0.0.1:5672" }); var all_stats = {}; diff --git a/example/broadcast/worker.js b/example/broadcast/worker.js index f9a3c52..8070a88 100644 --- a/example/broadcast/worker.js +++ b/example/broadcast/worker.js @@ -3,11 +3,12 @@ var worker_name = os.hostname() + ':' + process.pid; var counter = 0; var rpc = require('../../index').factory({ - url: "amqp://guest:guest@localhost:5672" + url: "amqp://tdev1:mq4tdev1@127.0.0.1:5672" }); -rpc.onBroadcast('getWorkerStat', function(params, cb) { - if(params && params.type == 'fullStat') { +rpc.on('getWorkerStat', function (params, cb) { + console.log("getWorkerStat: " + worker_name); + if (params && params.type == 'fullStat') { cb(null, { pid: process.pid, hostname: os.hostname(), @@ -16,9 +17,9 @@ rpc.onBroadcast('getWorkerStat', function(params, cb) { }); } else { - cb(null, { counter: counter++ }) + cb(null, {counter: counter++}) } -}); +}, {queueName: "test-" + process.pid}); -rpc.call('log', { worker: worker_name, message: 'worker started' }); \ No newline at end of file +rpc.rpcCall('log', {worker: worker_name, message: 'worker started'}); \ No newline at end of file diff --git a/example/round-robin/client.js b/example/round-robin/client.js index ee59de2..cdfc1c8 100644 --- a/example/round-robin/client.js +++ b/example/round-robin/client.js @@ -1,18 +1,39 @@ +Object.defineProperty(Error, 'fromJSON', { + value: function (other) { + var err = new Error(); + Object.getOwnPropertyNames(other).forEach(function (key) { + err[key] = other[key]; + }); + return err; + }, + configurable: true +}); var rpc = require('../../index').factory({ - conn_options: { url: "amqp://guest:guest@localhost:5672", heartbeat: 10 } + exchange: "uuu-test", exchange_options: {exclusive: false, autoDelete: true}, + conn_options: {url: "amqp://guest:guest@rabbitmq:5672", heartbeat: 10, confirm: true} }); -rpc.call('inc', 5, function() { +rpc.rpcCall('inc', 5, null, null, function () { console.log('results of inc:', arguments); //output: [6,4,7] }); -rpc.call('say.Hello', { name: 'John' }, function(msg) { +rpc.rpcCall('say.Hello', {name: 'John'}, null, null, function (msg) { console.log('results of say.Hello:', msg); //output: Hello John! }); -rpc.call('withoutCB', {}, function(msg) { +rpc.rpcCall('withoutCB', {}, null, null, function (msg) { console.log('withoutCB results:', msg); //output: please run function without cb parameter }); -rpc.call('withoutCB', {}); //output message on server side console +//rpc.rpcCall('withoutCB', {}); //output message on server side console + +//rpc.rpcCall('errorFn', null, null, null, function (err, succ) { +// throw Error.fromJSON(err); +//}); + +//rpc.rpcCall('waitsTooMuch', null, null, console, console.log); +//rpc.rpcCall('waitsTooMuch', null, null, console, console.log); + +rpc.rpcCall('waitsTooMuch', null, {"expiration": "3000"}, console, console.log); +rpc.rpcCall('waitsTooMuch', null, {"expiration": "3000"}, console, console.log); diff --git a/example/round-robin/server.js b/example/round-robin/server.js index 8a1993a..d50c5ea 100644 --- a/example/round-robin/server.js +++ b/example/round-robin/server.js @@ -1,17 +1,30 @@ +Object.defineProperty(Error.prototype, 'toJSON', { + value: function () { + var alt = {}; + + Object.getOwnPropertyNames(this).forEach(function (key) { + alt[key] = this[key]; + }, this); + + return alt; + }, + configurable: true +}); + var rpc = require('../../index').factory({ - conn_options: { url: "amqp://guest:guest@localhost:5672", heartbeat: 10 } + exchange: "uuu-test", exchange_options: {exclusive: false, autoDelete: true}, + conn_options: {url: "amqp://guest:guest@rabbitmq:5672", heartbeat: 10} }); -rpc.on('inc', function(param, cb){ +rpc.subscribe('zzttrr', function(param, cb){ var prevVal = param; - var nextVal = param+2; + var nextVal = param + 2; cb(++param, prevVal, nextVal); -}); - -rpc.on('say.*', function(param, cb, inf){ +}, {queueName: "test_inc"}); +rpc.subscribe('say.*', function (param, cb, inf) { var arr = inf.cmd.split('.'); var name = (param && param.name) ? param.name : 'world'; @@ -20,13 +33,23 @@ rpc.on('say.*', function(param, cb, inf){ }); -rpc.on('withoutCB', function(param, cb, inf) { +rpc.subscribe('withoutCB', function (param, cb, inf) { + + if (cb) { + cb('please run function without cb parameter') + } + else { + console.log('this is function withoutCB'); + } - if(cb){ - cb('please run function without cb parameter') - } - else{ - console.log('this is function withoutCB'); - } +}); + +rpc.subscribe('errorFn', function (param, cb) { + cb(new Error("errorFn"), null); +}); +rpc.subscribe('waitsTooMuch', function (param, cb) { + console.log("waitsTooMuch"); + //cb("waitsTooMuch OK!"); + setTimeout(cb.bind(null, "waitsTooMuch OK!"), 5000); }); diff --git a/index.js b/index.js index 44c965f..1b4665a 100644 --- a/index.js +++ b/index.js @@ -1,22 +1,22 @@ - - var amqp = require('amqp'); var uuid = require('node-uuid').v4; -var os = require('os'); -var debug= require('debug')('amqp-rpc'); +var os = require('os'); var queueNo = 0; -function rpc(opt) { +var instanceId = 0; - if(!opt) opt = {}; +function rpc(opt) { + this.instanceId = instanceId++; + + if (!opt) opt = {}; this.opt = opt; - this.__conn = opt.connection ? opt.connection : null; - this.__url = opt.url ? opt.url: 'amqp://guest:guest@localhost:5672'; - this.__exchange = opt.exchangeInstance ? opt.exchangeInstance : null; - this.__exchange_name = opt.exchange ? opt.exchange : 'rpc_exchange'; - this.__exchange_options = opt.exchange_options ? opt.exchange_options : {exclusive: false, autoDelete: true }; - this.__impl_options = opt.ipml_options || {defaultExchangeName: this.__exchange_name}; - this.__conn_options = opt.conn_options || {}; + this.__conn = opt.connection ? opt.connection : null; + this.__url = opt.url ? opt.url : 'amqp://guest:guest@localhost:5672'; + this.__exchange = opt.exchangeInstance ? opt.exchangeInstance : null; + this.__exchange_name = opt.exchange ? opt.exchange : 'rpc_exchange'; + this.__exchange_options = opt.exchange_options ? opt.exchange_options : {exclusive: false, autoDelete: true}; + this.__impl_options = opt.ipml_options || {defaultExchangeName: this.__exchange_name}; + this.__conn_options = opt.conn_options || {}; this.__results_queue = null; this.__results_queue_name = null; @@ -34,22 +34,23 @@ function rpc(opt) { * @returns {string} */ -rpc.prototype.generateQueueName = function(type) { - return /*'njsListener:' +*/ os.hostname() + ':pid' + process.pid + ':' + type; +rpc.prototype.generateQueueName = function (type) { + return uuid() + ':' + os.hostname() + ':pid' + process.pid + ':' + type; } -rpc.prototype._connect = function(cb) { +rpc.prototype._connect = function (cb) { - if(!cb) { - cb = function(){}; + if (!cb) { + cb = function () { + }; } - if(this.__conn) { + if (this.__conn) { - if(this.__connCbs.length > 0) { + if (this.__connCbs.length > 0) { - this.__connCbs.push(cb); + this.__connCbs.push(cb); return true; } @@ -63,43 +64,50 @@ rpc.prototype._connect = function(cb) { this.__connCbs.push(cb); var options = this.__conn_options; - if(!options.url && !options.host) options.url = this.__url; - debug("createConnection options=", options, ', ipml_options=', this.__impl_options || {}); + if (!options.url && !options.host) options.url = this.__url; this.__conn = amqp.createConnection( - options, + options, this.__impl_options ); - this.__conn.on('ready', function() { - debug("connected to " + $this.__conn.serverProperties.product); - var cbs = $this.__connCbs; - $this.__connCbs = []; + this.__conn.on('error', function (err) { + throw err; + }); - for(var i=0; i< cbs.length; i++) { - cbs[i]($this.__conn); - } + this.__conn.on('ready', function () { + var cbs = $this.__connCbs; + $this.__connCbs = []; + + for (var i = 0; i < cbs.length; i++) { + cbs[i]($this.__conn); + } }); } /** * disconnect from MQ broker */ -rpc.prototype.disconnect = function() { - debug("disconnect()"); - if(!this.__conn) return; - this.__conn.end(); +rpc.prototype.disconnect = function () { + if (!this.__conn) return; + this.__conn.removeAllListeners('error'); + var self = this; + this.__conn.on('error', function () { + console.log('--> error disconnecting amqp-rpc #' + self.instanceId); + }); + this.__conn.disconnect(); this.__conn = null; -} +}; -rpc.prototype._makeExchange = function(cb) { +rpc.prototype._makeExchange = function (cb) { - if(!cb) { - cb = function(){}; + if (!cb) { + cb = function () { + }; } - if(this.__exchange) { + if (this.__exchange) { - if(this.__exchangeCbs.length > 0) { + if (this.__exchangeCbs.length > 0) { this.__exchangeCbs.push(cb); @@ -112,31 +120,31 @@ rpc.prototype._makeExchange = function(cb) { var $this = this; this.__exchangeCbs.push(cb); - /* - * Added option autoDelete=false. - * Otherwise we had an error in library node-amqp version > 0.1.7. - * Text of such error: "PRECONDITION_FAILED - cannot redeclare exchange '' in vhost '/' with different type, durable, internal or autodelete value" - */ - this.__exchange = this.__conn.exchange(this.__exchange_name, { autoDelete: false }, function(exchange) { - debug('Exchange ' + exchange.name + ' is open'); + /* + * Added option autoDelete=false. + * Otherwise we had an error in library node-amqp version > 0.1.7. + * Text of such error: "PRECONDITION_FAILED - cannot redeclare exchange '' in vhost '/' with different type, durable, internal or autodelete value" + */ + this.__exchange = this.__conn.exchange(this.__exchange_name, {autoDelete: false}, function (exchange) { var cbs = $this.__exchangeCbs; $this.__exchangeCbs = []; - for(var i=0; i< cbs.length; i++) { + for (var i = 0; i < cbs.length; i++) { cbs[i]($this.__exchange); } }); } -rpc.prototype._makeResultsQueue = function(cb) { +rpc.prototype._makeResultsQueue = function (cb) { - if(!cb) { - cb = function(){}; + if (!cb) { + cb = function () { + }; } - if(this.__results_queue) { - if(this.__make_results_cb.length > 0) { + if (this.__results_queue) { + if (this.__make_results_cb.length > 0) { this.__make_results_cb.push(cb); return true; @@ -149,23 +157,21 @@ rpc.prototype._makeResultsQueue = function(cb) { this.__results_queue_name = this.generateQueueName('callback'); this.__make_results_cb.push(cb); - $this._makeExchange(function() { + $this._makeExchange(function () { $this.__results_queue = $this.__conn.queue( $this.__results_queue_name, $this.__exchange_options, - function(queue) { - debug('Callback queue ' + queue.name + ' is open'); - queue.subscribe(function() { + function (queue) { + queue.subscribe(function () { $this.__onResult.apply($this, arguments); }); queue.bind($this.__exchange, $this.__results_queue_name); - debug('Bind queue ' + queue.name + ' to exchange ' + $this.__exchange.name); var cbs = $this.__make_results_cb; $this.__make_results_cb = []; - for(var i=0; i 0) { + console.log("======= Amqp monitoring: deleting preexisting bindings ======="); + } + + bindingsToDelete.forEach(function (b) { + console.log("deleting binding: ", b); + queue.unbind(b.exchange, b.routing); + }); + + if (bindingsToDelete.length > 0) { + console.log("=============================================================="); + } + + }); + }); +}; + /** * add new command handler * @param {string} cmd command name or match string @@ -287,30 +341,29 @@ rpc.prototype.call = function(cmd, params, cb, context, options) { */ -rpc.prototype.on = function(cmd, cb, context, options) { - debug('on(), routingKey=%s', cmd); - if(this.__cmds[ cmd ]) return false; +rpc.prototype.subscribeParallel = function (cmd, cb, options, context) { + if (this.__cmds[cmd]) return false; options || (options = {}); var $this = this; + options.queueName = options.queueName || cmd; - this._connect(function() { + this._connect(function () { - $this.__conn.queue(options.queueName || cmd, function(queue) { - $this.__cmds[ cmd ] = { queue: queue }; - queue.subscribe(function(message, d, headers, deliveryInfo) { + $this.__conn.queue(options.queueName, function (queue) { + $this.__cmds[cmd] = {queue: queue}; + queue.subscribe(function (message, d, headers, deliveryInfo) { var cmdInfo = { - cmd: deliveryInfo.routingKey, - exchange: deliveryInfo.exchange, + cmd: deliveryInfo.routingKey, + exchange: deliveryInfo.exchange, contentType: deliveryInfo.contentType, - size: deliveryInfo.size + size: deliveryInfo.size }; - if(deliveryInfo.correlationId && deliveryInfo.replyTo ) { - - return cb.call(context, message, function(err, data) { + if (deliveryInfo.correlationId && deliveryInfo.replyTo) { + return cb.call(context, message, function (err, data) { var options = { correlationId: deliveryInfo.correlationId } @@ -326,7 +379,21 @@ rpc.prototype.on = function(cmd, cb, context, options) { return cb.call(context, message, null, cmdInfo); }); - $this._makeExchange(function(){ + $this.printChannels(options.queueName); + + $this._getCurrentBindings(options.queueName, "%2f", function (bindings) { + var bindingsToDelete = bindings.map(function (b) { + return {exchange: b.source, routing: b.routing_key}; + }).filter(function (b) { + return b.exchange && b.exchange !== ''; + }).filter(function (b) { + return !(b.exchange === $this.__exchange_name && b.routing === cmd); + }); + + $this._deleteBindings(queue, bindingsToDelete); + }); + + $this._makeExchange(function () { queue.bind($this.__exchange, cmd); }); @@ -338,55 +405,132 @@ rpc.prototype.on = function(cmd, cb, context, options) { } /** - * remove command handler added with "on" method - * @param {string} cmd command or match string - * @return {boolean} + * add new command handler, it handles a message per time, at the end it sends the ack to rabbitMQ + * @param cmd + * @param cb + * @param context + * @param options + * @returns {boolean} */ - -rpc.prototype.off = function(cmd) { - debug('off', cmd); - if(!this.__cmds[ cmd ]) return false; +rpc.prototype.subscribe = function (cmd, cb, options, context) { + if (this.__cmds[cmd]) return false; + options || (options = {}); var $this = this; - var c = $this.__cmds[ cmd ]; - function unsubscribe(cb) { - if(c.ctag) - c.queue.unsubscribe(c.ctag); + options.queueName = options.queueName || cmd; - if(cb) - return cb(); - } + this._connect(function () { + $this.__conn.queue(options.queueName, function (queue) { + $this.__cmds[cmd] = {queue: queue}; - function unbind(cb) { + queue.subscribe({ + ack: true + }, function (message, d, headers, deliveryInfo) { - if(c.queue) { - unsubscribe(function() { - c.queue.unbind($this.__exchange, cmd); + var cmdInfo = { + cmd: deliveryInfo.routingKey, + exchange: deliveryInfo.exchange, + contentType: deliveryInfo.contentType, + size: deliveryInfo.size + }; + + if (deliveryInfo.correlationId && deliveryInfo.replyTo) { + + return cb.call(context, message, function (err, data) { + queue.shift(); + var options = { + correlationId: deliveryInfo.correlationId + } - if(cb) - return cb(); + $this.__exchange.publish( + deliveryInfo.replyTo, + Array.prototype.slice.call(arguments), + options + ); + }, cmdInfo); + } + else + return cb.call(context, message, null, cmdInfo); }); - } - } + $this.printChannels(options.queueName); - function destroy(cb) { + $this._getCurrentBindings(options.queueName, "%2f", function (bindings) { + var bindingsToDelete = bindings.map(function (b) { + return {exchange: b.source, routing: b.routing_key}; + }).filter(function (b) { + return b.exchange && b.exchange !== ''; + }).filter(function (b) { + return !(b.exchange === $this.__exchange_name && b.routing === cmd); + }); - if(c.queue){ - unbind(function(){ - c.queue.destroy() + $this._deleteBindings(queue, bindingsToDelete); + }); - if(cb) - return cb(); + $this._makeExchange(function () { + queue.bind($this.__exchange, cmd); }); - } - } - destroy(function(){ - delete $this.__cmds[ cmd ]; + }); }); + + return true; +} + +/** + * remove command handler added with "on" method + * @param {string} cmd command or match string + * @return {boolean} + */ + +rpc.prototype.off = function (cmd) { + if (!this.__cmds[cmd]) return false; + + var $this = this; + + var c = $this.__cmds[cmd]; + + function unsubscribe(cb) { + if (c.ctag) + c.queue.unsubscribe(c.ctag); + + if (cb) + return cb(); + } + + // other processes could remain bounded to the queue, so we cannot delete it + + //function unbind(cb) { + // + // if (c.queue) { + // unsubscribe(function () { + // c.queue.unbind($this.__exchange, cmd); + // + // if (cb) + // return cb(); + // }); + // + // } + //} + // + //function destroy(cb) { + // + // if (c.queue) { + // unbind(function () { + // c.queue.destroy() + // + // if (cb) + // return cb(); + // }); + // } + //} + + //destroy(function () { + delete $this.__cmds[cmd]; + //}); + return true; } @@ -398,50 +542,63 @@ rpc.prototype.off = function(cmd) { */ -rpc.prototype.callBroadcast = function(cmd, params, options) { +rpc.prototype.callBroadcast = function (cmd, params, options) { var $this = this; options || (options = {}); options.broadcast = true; - options.autoDeleteCallback = options.ttl ? false : true; - var corr_id = this.call.call(this, cmd, params, options.onResponse, options.context, options); - if(options.ttl) { - setTimeout(function() { + //options.autoDeleteCallback = options.ttl ? false : true; + var corr_id = this.rpcCall.call(this, cmd, params, options, options.context, options.onResponse); + if (options.ttl) { + setTimeout(function () { //release cb - if($this.__results_cb[ corr_id ]) { - delete $this.__results_cb[ corr_id ]; + if ($this.__results_cb[corr_id]) { + delete $this.__results_cb[corr_id]; } options.onComplete.call(options.context, cmd, options); }, options.ttl); } } -/** - * subscribe to broadcast commands - * @param {string} cmd - * @param {function} cb - * @param {object} context - */ - -rpc.prototype.onBroadcast = function (cmd, cb, context, options) { +rpc.prototype.printChannels = function (queueName) { + var thisRabbit = this; + thisRabbit._getChannels(queueName, function (channels) { + if (channels.length > 0) { - options || (options = {}); - options.queueName = this.generateQueueName('broadcast:q'+ (queueNo++) ); - return this.on.call(this, cmd, cb, context, options); -} + console.log("======= Amqp monitoring report ======="); + console.log("Channels in [" + queueName + "] queue: "); + channels.forEach(function (v) { + console.log("\t" + v); + }); -/** - * - * @type {Function} - */ + console.log("======================================"); -rpc.prototype.offBroadcast = rpc.prototype.off; + } + }); +}; + +rpc.prototype._getChannels = function (queueName, cb) { + var myRpc = new rpc({exchange: "rpc_exchange", conn_options: {url: this.opt.conn_options.url}}); + var routing = "rabbitmon"; + var message = {apiCall: "queues/" + "%2f" + "/" + queueName}; + myRpc.rpcCall(routing, message, {expiration: "20000"}, null, function (err, res) { + if (err) { + console.error(err); + throw err; + } + myRpc.disconnect(); + var ipList = res.consumer_details.map(function (consumer) { + return consumer.channel_details.connection_name; + }); + cb(ipList); + }); +}; module.exports.amqpRPC = rpc; -module.exports.factory = function(opt) { +module.exports.factory = function (opt) { return new rpc(opt); } diff --git a/package.json b/package.json index ff0a711..b12418c 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "amqp", "rpc" ], - "version": "0.0.8", + "version": "0.2.4", "preferGlobal": true, "author": { "name": "Eugene Demchenko" @@ -28,9 +28,8 @@ } ], "dependencies": { - "amqp": "0.2.0", - "node-uuid": "*", - "debug": "~0.7.2" + "amqp": "latest", + "node-uuid": "^1.4.3" }, "readme": "\n#AMQP-RPC\n\nRPC library based on AMQP protocol.\nTested with RabbitMQ on the highload project.\n\n\n###Install RabitMQ\n\n apt-get install rabbitmq-server\n\n###Install library\n\n npm install amqp-rpc\n\n##round-robin\n\nExample: Call remote function.\nRun multiple servers.js for round-robin shared.\n\n\n###server.js example\n\n var rpc = require('amqp-rpc').factory({\n url: \"amqp://guest:guest@localhost:5672\"\n });\n\n\n rpc.on('inc', function(param, cb){\n var prevVal = param;\n var nextVal = param+2;\n cb(++param, prevVal, nextVal);\n });\n\n rpc.on('say.*', function(param, cb, inf){\n\n var arr = inf.cmd.split('.');\n\n var name = (param && param.name) ? param.name : 'world';\n\n cb(arr[1] + ' ' + name + '!');\n\n });\n\n rpc.on('withoutCB', function(param, cb, inf) {\n\n if(cb){\n cb('please run function without cb parameter')\n }\n else{\n console.log('this is function withoutCB');\n }\n\n });\n\n\n\n###client.js example\n\n var rpc = require('amqp-rpc').factory({\n url: \"amqp://guest:guest@localhost:5672\"\n });\n\n rpc.call('inc', 5, function() {\n console.log('results of inc:', arguments); //output: [6,4,7]\n });\n\n rpc.call('say.Hello', { name: 'John' }, function(msg) {\n console.log('results of say.Hello:', msg); //output: Hello John!\n });\n\n rpc.call('withoutCB', {}, function(msg) {\n console.log('withoutCB results:', msg); //output: please run function without cb parameter\n });\n\n rpc.call('withoutCB', {}); //output message on server side console\n\n\n##broadcast\n\nExample: Core receiving data from all workers.\nRun multiple worker.js for broadcast witness.\nThe core.js must be launched after all worker.js instances.\n\n###example/broadcast/worker.js\n\n var os = require('os');\n var worker_name = os.hostname() + ':' + process.pid;\n var counter = 0;\n\n var rpc = require('../../index').factory({\n url: \"amqp://guest:guest@localhost:5672\"\n });\n\n rpc.onBroadcast('getWorkerStat', function(params, cb) {\n if(params && params.type == 'fullStat') {\n cb(null, {\n pid: process.pid,\n hostname: os.hostname(),\n uptime: process.uptime(),\n counter: counter++\n });\n }\n else {\n cb(null, { counter: counter++ })\n }\n });\n\n###example/broadcast/core.js\n\n var rpc = require('../../index').factory({\n url: \"amqp://guest:guest@localhost:5672\"\n });\n\n var all_stats = {};\n\n //rpc.callBroadcast() is rpc.call() + waiting multiple responses\n //If remote handler without response data, you can use rpc.call() for initiate broadcast calls.\n\n rpc.callBroadcast(\n 'getWorkerStat',\n { type: 'fullStat'}, //request parameters\n { //call options\n ttl: 1000, //wait response time (1 seconds), after run onComplete\n onResponse: function(err, stat) { //callback on each worker response\n all_stats[ stat.hostname+':'+ stat.pid ] = stat;\n\n },\n onComplete: function() { //callback on ttl expired\n console.log('----------------------- WORKER STATISTICS ----------------------------------------');\n for(var worker in all_stats) {\n s = all_stats[worker];\n console.log(worker, '\\tuptime=', s.uptime.toFixed(2) + ' seconds', '\\tcounter=', s.counter);\n }\n }\n });\n\n\nResults for three workers:\n\n ----------------------- WORKER STATISTICS ----------------------------------------\n host1:2612 \tuptime= 2470.39 seconds \tcounter= 2\n host2:1615 \tuptime= 3723.53 seconds \tcounter= 8\n host2:2822 \tuptime= 2279.16 seconds \tcounter= 3\n\nEugene Demchenko aka Goldy skype demchenkoe email demchenkoev@gmail.com\n", "readmeFilename": "README.md",