Skip to content

Commit 047e7f6

Browse files
committed
AMQP: Use exchange instead of queue for discovery
References msgflo/msgflo#13
1 parent 6a87305 commit 047e7f6

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

src/amqp.coffee

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
debug = require('debug')('msgflo:amqp')
33
async = require 'async'
44
interfaces = require './interfaces'
5+
uuid = require 'uuid'
56

67
try
78
amqp = require 'amqplib/callback_api'
@@ -144,10 +145,12 @@ class Client extends interfaces.MessagingClient
144145
protocol: 'discovery'
145146
command: 'participant'
146147
payload: part
147-
@channel.assertQueue 'fbp'
148-
data = new Buffer JSON.stringify msg
149-
@channel.sendToQueue 'fbp', data
150-
return callback null
148+
exchangeName = 'fbp'
149+
@channel.assertExchange exchangeName, 'fanout', {}, (err) =>
150+
return callback err if err
151+
data = new Buffer JSON.stringify msg
152+
@channel.publish exchangeName, '', data
153+
return callback null
151154

152155
class MessageBroker extends Client
153156
constructor: (address, options) ->
@@ -221,9 +224,17 @@ class MessageBroker extends Client
221224
data: data
222225
return handler out
223226

224-
@channel.assertQueue 'fbp'
225-
@channel.consume 'fbp', deserialize
226-
return callback null
227+
exchangeName = 'fbp'
228+
@channel.assertExchange exchangeName, 'fanout', {}, (err) =>
229+
return callback err if err
230+
subscribeQueue = '.fbp-subscribe-' + uuid.v4()
231+
@channel.assertQueue subscribeQueue, { persistent: false }, (err) =>
232+
return callback err if err
233+
@channel.bindQueue subscribeQueue, exchangeName, '', {}, (err) =>
234+
return callback err if err
235+
@channel.consume subscribeQueue, deserialize
236+
debug 'subscribed to', subscribeQueue, exchangeName
237+
return callback null
227238

228239
exports.Client = Client
229240
exports.MessageBroker = MessageBroker

0 commit comments

Comments
 (0)