Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
Fix: Set 60 seconds for default mqtt keepalive option (#370)
Add: virtual host configuration added to AMQP transport (config.amqp.vhost and IOTA_AMQP_VHOST env var)
1 change: 1 addition & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ config.mqtt = {
config.amqp = {
host: 'localhost',
port: 5672,
// vhost: "custom-virtual-host",
// username: 'guest',
// password: 'guest',
exchange: 'iota-exchange',
Expand Down
3 changes: 3 additions & 0 deletions docs/installationguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ IoT Agent. The following attributes are accepted:
default is `mqtt`
- **host**: Host where the MQTT Broker is located.
- **port**: Port where the MQTT Broker is listening
- **vhost**: virtual host in the AMQP Broker to which IoT Agent will connect (optional).
- **username**: Username for the IoT Agent in the MQTT broker, if authentication is activated.
- **password**: Password for the IoT Agent in the MQTT broker, if authentication is activated.
- **ca**: ca certificates to use for validating server certificates (optional). Default is to trust the well-known CAs
Expand Down Expand Up @@ -239,6 +240,7 @@ The ones relating specific Ultralight 2.0 bindings are described in the followin
| IOTA_MQTT_PROTOCOL | mqtt.protocol |
| IOTA_MQTT_HOST | mqtt.host |
| IOTA_MQTT_PORT | mqtt.port |
| IOTA_MQTT_VHOST | mqtt.vhost |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, the new vhost parameter is for AMQP configuration, not MQTT (looking to "twin" PR telefonicaid/iotagent-json#527 it only appears in the AMQP section of the configuration).

Is this a typo? Or maybe I'm missing something? :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @fgalan and sorry for the delay. Yes, it was a typo. I've updated the PR. Thanks.

| IOTA_MQTT_CA | mqtt.ca |
| IOTA_MQTT_CERT | mqtt.cert |
| IOTA_MQTT_KEY | mqtt.key |
Expand All @@ -253,6 +255,7 @@ The ones relating specific Ultralight 2.0 bindings are described in the followin
| IOTA_MQTT_AVOID_LEADING_SLASH | mqtt.avoidLeadingSlash |
| IOTA_AMQP_HOST | amqp.host |
| IOTA_AMQP_PORT | amqp.port |
| IOTA_AMQP_VHOST | amqp.vhost |
| IOTA_AMQP_USERNAME | amqp.username |
| IOTA_AMQP_PASSWORD | amqp.password |
| IOTA_AMQP_EXCHANGE | amqp.exchange |
Expand Down
17 changes: 10 additions & 7 deletions lib/bindings/AMQPBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function generateCommandExecution(apiKey, device, attribute) {
function commandHandler(device, attributes, callback) {
config.getLogger().debug(context, 'Handling AMQP command for device [%s]', device.id);

utils.getEffectiveApiKey(device.service, device.subservice, device, function(error, apiKey) {
utils.getEffectiveApiKey(device.service, device.subservice, device, function (error, apiKey) {
async.series(attributes.map(generateCommandExecution.bind(null, apiKey, device)), callback);
});
}
Expand Down Expand Up @@ -151,6 +151,9 @@ function start(callback) {
uri += ':' + config.getConfig().amqp.port;
}
}
if (config.getConfig().amqp.vhost && config.getConfig().amqp.vhost !== '/') {
uri += '/' + config.getConfig().amqp.vhost;
}
} else {
return config.getLogger().error(context, 'Error AMQP is not configured');
}
Expand All @@ -167,7 +170,7 @@ function start(callback) {
isConnecting = true;
amqp.connect(
uri,
function(err, conn) {
function (err, conn) {
isConnecting = false;
// try again
if (err) {
Expand All @@ -177,12 +180,12 @@ function start(callback) {
return setTimeout(createConnection, retryTime * 1000, callback);
}
} else {
conn.on('error', function(err) {
conn.on('error', function (err) {
if (err.message !== 'Connection closing') {
config.getLogger().error(context, err.message);
}
});
conn.on('close', function() {
conn.on('close', function () {
// If amqpConn is null, the connection has been closed on purpose
if (amqpConn) {
config.getLogger().error(context, 'reconnecting');
Expand All @@ -204,7 +207,7 @@ function start(callback) {

function createChannel(callback) {
config.getLogger().debug(context, 'channel creating');
amqpConn.createChannel(function(err, ch) {
amqpConn.createChannel(function (err, ch) {
if (err) {
config.getLogger().error(context, err.message);
}
Expand All @@ -225,7 +228,7 @@ function start(callback) {

function assertQueue(callback) {
config.getLogger().debug(context, 'asserting queues');
amqpChannel.assertQueue(queue, { exclusive: false }, function() {
amqpChannel.assertQueue(queue, { exclusive: false }, function () {
amqpChannel.assertQueue(queue + '_commands', { exclusive: false }, callback);
});
}
Expand All @@ -242,7 +245,7 @@ function start(callback) {
callback();
}

async.waterfall([createConnection, createChannel, assertExchange, assertQueue, createListener], function(error) {
async.waterfall([createConnection, createChannel, assertExchange, assertQueue, createListener], function (error) {
if (error) {
config.getLogger().debug('AMQP error %j', error);
}
Expand Down
6 changes: 6 additions & 0 deletions lib/configService.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ function processEnvironmentVariables() {
'IOTA_MQTT_AVOID_LEADING_SLASHES',
'IOTA_AMQP_HOST',
'IOTA_AMQP_PORT',
'IOTA_AMQP_VHOST',
'IOTA_AMQP_USERNAME',
'IOTA_AMQP_PASSWORD',
'IOTA_AMQP_EXCHANGE',
Expand Down Expand Up @@ -102,6 +103,7 @@ function processEnvironmentVariables() {
const amqpVariables = [
'IOTA_AMQP_HOST',
'IOTA_AMQP_PORT',
'IOTA_AMQP_VHOST',
'IOTA_AMQP_USERNAME',
'IOTA_AMQP_PASSWORD',
'IOTA_AMQP_EXCHANGE',
Expand Down Expand Up @@ -212,6 +214,10 @@ function processEnvironmentVariables() {
config.amqp.port = process.env.IOTA_AMQP_PORT;
}

if (process.env.IOTA_AMQP_VHOST) {
config.amqp.vhost = process.env.IOTA_AMQP_VHOST;
}

if (process.env.IOTA_AMQP_USERNAME) {
config.amqp.username = process.env.IOTA_AMQP_USERNAME;
}
Expand Down
3 changes: 3 additions & 0 deletions test/unit/startup-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ describe('Startup tests', function() {
beforeEach(function() {
process.env.IOTA_AMQP_HOST = 'localhost';
process.env.IOTA_AMQP_PORT = '9090';
process.env.IOTA_AMQP_VHOST = 'custom_vhost';
process.env.IOTA_AMQP_USERNAME = 'useramqp';
process.env.IOTA_AMQP_PASSWORD = 'passamqp';
process.env.IOTA_AMQP_EXCHANGE = 'xxx';
Expand All @@ -99,6 +100,7 @@ describe('Startup tests', function() {
afterEach(function() {
delete process.env.IOTA_AMQP_HOST;
delete process.env.IOTA_AMQP_PORT;
delete process.env.IOTA_AMQP_VHOST;
delete process.env.IOTA_AMQP_USERNAME;
delete process.env.IOTA_AMQP_PASSWORD;
delete process.env.IOTA_AMQP_EXCHANGE;
Expand All @@ -112,6 +114,7 @@ describe('Startup tests', function() {
config.setConfig(iotAgentConfig);
config.getConfig().amqp.host.should.equal('localhost');
config.getConfig().amqp.port.should.equal('9090');
config.getConfig().amqp.vhost.should.equal('custom_vhost');
config.getConfig().amqp.username.should.equal('useramqp');
config.getConfig().amqp.password.should.equal('passamqp');
config.getConfig().amqp.exchange.should.equal('xxx');
Expand Down