From ca636ff95f3d06d4ae6ef6f19f564c520cdd5c50 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Mon, 27 Feb 2017 16:38:39 +0100 Subject: [PATCH 1/3] broker.subscribeParticipantChange(): Add callback --- src/amqp.coffee | 8 +++++++- src/direct.coffee | 11 +++++++++-- src/interfaces.coffee | 2 +- src/mqtt.coffee | 11 +++++++++-- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/amqp.coffee b/src/amqp.coffee index 2efc41c..7bc1f5d 100644 --- a/src/amqp.coffee +++ b/src/amqp.coffee @@ -203,7 +203,12 @@ class MessageBroker extends Client return callback null # Participant registration - subscribeParticipantChange: (handler) -> + subscribeParticipantChange: (handler, callback) -> + defaultCallback = (err) -> + if err + console.err "Error in msgflo.amqp.subscribeParticipantChange, and no callback added", err + callback = defaultCallback if not callback + deserialize = (message) => debug 'receive on fbp', message.fields.deliveryTag data = null @@ -218,6 +223,7 @@ class MessageBroker extends Client @channel.assertQueue 'fbp' @channel.consume 'fbp', deserialize + return callback null exports.Client = Client exports.MessageBroker = MessageBroker diff --git a/src/direct.coffee b/src/direct.coffee index 6aa85c0..b4e2c6b 100644 --- a/src/direct.coffee +++ b/src/direct.coffee @@ -121,9 +121,16 @@ class MessageBroker extends interfaces.MessageBroker nackMessage: (message) -> return - subscribeParticipantChange: (handler) -> + subscribeParticipantChange: (handler, callback) -> + defaultCallback = (err) -> + if err + console.err "Error in msgflo.direct.subscribeParticipantChange, and no callback added", err + callback = defaultCallback if not callback + @createQueue '', 'fbp', (err) => - @subscribeToQueue 'fbp', handler, () -> + return callback err if err + @subscribeToQueue 'fbp', handler, (err) -> + return callback err exports.MessageBroker = MessageBroker exports.Client = Client diff --git a/src/interfaces.coffee b/src/interfaces.coffee index 4a8f68d..afd5c8b 100644 --- a/src/interfaces.coffee +++ b/src/interfaces.coffee @@ -73,7 +73,7 @@ class MessageBroker extends MessagingSystem throw new Error 'Not Implemented' # Participant registration - subscribeParticipantChange: (handler) -> + subscribeParticipantChange: (handler, callback) -> throw new Error 'Not Implemented' exports.MessageBroker = MessageBroker diff --git a/src/mqtt.coffee b/src/mqtt.coffee index a6f3a8d..380c72a 100644 --- a/src/mqtt.coffee +++ b/src/mqtt.coffee @@ -125,9 +125,16 @@ class MessageBroker extends Client routing.binderMixin this # Participant registration - subscribeParticipantChange: (handler) -> + subscribeParticipantChange: (handler, callback) -> + defaultCallback = (err) -> + if err + console.err "Error in msgflo.mqtt.subscribeParticipantChange, and no callback added", err + callback = defaultCallback if not callback + @createQueue '', 'fbp', (err) => - @subscribeToQueue 'fbp', handler, () -> + return callback err if err + @subscribeToQueue 'fbp', handler, (err) -> + return callback err exports.Client = Client exports.MessageBroker = MessageBroker From 6a87305dbbc7ef5608ae5dae60e4eab81c5aac04 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Mon, 27 Feb 2017 17:00:59 +0100 Subject: [PATCH 2/3] tests: Check discovery between client/broker Was completely uncovered in our tests. Though `msgflo` has some that probably exercise this --- spec/01transport.coffee | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/spec/01transport.coffee b/spec/01transport.coffee index 7d41823..36515a6 100644 --- a/spec/01transport.coffee +++ b/spec/01transport.coffee @@ -119,6 +119,30 @@ transportTests = (type) -> clientA.connect (err) -> done err + describe 'sending participant registration message', -> + client = null + beforeEach (done) -> + client = transport.getClient address + return client.connect done + afterEach (done) -> + return client.disconnect done + + it 'should be received by subscribed broker', (done) -> + definition = + id: '123' + role: 'role' + component: 'lib/Component' + inports: [] + outports: [] + onDiscover = (message) -> + got = message.data.payload + chai.expect(got).to.eql definition + return done() + broker.subscribeParticipantChange onDiscover, (err) -> + return done err if err + client.registerParticipant definition, (err) -> + return done err if err + describe 'outqueue without subscribers', -> it 'sending should not error', (done) -> payload = { foo: 'bar91' } From 047e7f6da5abce50c9a074a14e94b9d9de6e9cfa Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Mon, 27 Feb 2017 17:11:30 +0100 Subject: [PATCH 3/3] AMQP: Use exchange instead of queue for discovery References msgflo/msgflo#13 --- src/amqp.coffee | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/amqp.coffee b/src/amqp.coffee index 7bc1f5d..b4185ca 100644 --- a/src/amqp.coffee +++ b/src/amqp.coffee @@ -2,6 +2,7 @@ debug = require('debug')('msgflo:amqp') async = require 'async' interfaces = require './interfaces' +uuid = require 'uuid' try amqp = require 'amqplib/callback_api' @@ -144,10 +145,12 @@ class Client extends interfaces.MessagingClient protocol: 'discovery' command: 'participant' payload: part - @channel.assertQueue 'fbp' - data = new Buffer JSON.stringify msg - @channel.sendToQueue 'fbp', data - return callback null + exchangeName = 'fbp' + @channel.assertExchange exchangeName, 'fanout', {}, (err) => + return callback err if err + data = new Buffer JSON.stringify msg + @channel.publish exchangeName, '', data + return callback null class MessageBroker extends Client constructor: (address, options) -> @@ -221,9 +224,17 @@ class MessageBroker extends Client data: data return handler out - @channel.assertQueue 'fbp' - @channel.consume 'fbp', deserialize - return callback null + exchangeName = 'fbp' + @channel.assertExchange exchangeName, 'fanout', {}, (err) => + return callback err if err + subscribeQueue = '.fbp-subscribe-' + uuid.v4() + @channel.assertQueue subscribeQueue, { persistent: false }, (err) => + return callback err if err + @channel.bindQueue subscribeQueue, exchangeName, '', {}, (err) => + return callback err if err + @channel.consume subscribeQueue, deserialize + debug 'subscribed to', subscribeQueue, exchangeName + return callback null exports.Client = Client exports.MessageBroker = MessageBroker