Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions lib/bindings/AMQPBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ function start(callback) {

config.getLogger().info(context, 'Starting AMQP binding');

function createConnection(callback) {
function createConnection(callbackcreateConnection) {
config.getLogger().info(context, 'creating connnection');
if (isConnecting) {
return;
Expand All @@ -161,12 +161,12 @@ function start(callback) {
config.getLogger().error(context, err.message);
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000, callback);
return setTimeout(createConnection, retryTime * 1000, callbackcreateConnection);
}
} else {
conn.on('error', function(err) {
if (err.message !== 'Connection closing') {
config.getLogger().error(context, err.message);
conn.on('error', function(errConn) {
if (errConn.message !== 'Connection closing') {
config.getLogger().error(context, errConn.message);
}
});
conn.on('close', function() {
Expand All @@ -183,43 +183,43 @@ function start(callback) {
});
config.getLogger().info(context, 'connected');
amqpConn = conn;
if (callback) {
callback();
if (callbackcreateConnection) {
callbackcreateConnection();
}
}
}
);
}

function createChannel(callback) {
function createChannel(callbackcreateChannel) {
config.getLogger().debug(context, 'channel creating');
amqpConn.createChannel(function(err, ch) {
if (err) {
config.getLogger().error(context, err.message);
}
config.getLogger().debug(context, 'channel created');
amqpChannel = ch;
callback();
callbackcreateChannel();
});
}

function assertExchange(callback) {
function assertExchange(callbackassertExchange) {
if (amqpChannel) {
config.getLogger().debug(context, 'asserting exchange');
amqpChannel.assertExchange(exchange, 'topic', {});
config.getLogger().debug(context, 'exchange asserted');
callback();
callbackassertExchange();
}
}

function assertQueue(callback) {
function assertQueue(callbackassertQueue) {
config.getLogger().debug(context, 'asserting queues');
amqpChannel.assertQueue(queue, { exclusive: false }, function() {
amqpChannel.assertQueue(queue + '_commands', { exclusive: false }, callback);
amqpChannel.assertQueue(queue + '_commands', { exclusive: false }, callbackassertQueue);
});
}

function createListener(queueObj, callback) {
function createListener(queueObj, callbackcreateListener) {
config.getLogger().debug(context, 'creating listeners');
amqpChannel.bindQueue(queue, exchange, '.*.*.attrs.#');
amqpChannel.consume(queue, queueListener, { noAck: true });
Expand All @@ -228,7 +228,7 @@ function start(callback) {
amqpChannel.bindQueue(queue + '_commands', exchange, '.*.*.cmdexe');
amqpChannel.consume(queue + '_commands', queueListener, { noAck: true });
config.getLogger().debug(context, 'subscribed to command queue');
callback();
callbackcreateListener();
}

async.waterfall([createConnection, createChannel, assertExchange, assertQueue, createListener], function(error) {
Expand Down
121 changes: 112 additions & 9 deletions lib/bindings/HTTPBindings.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ var http = require('http'),
context = {
op: 'IOTAUL.HTTP.Binding'
},
swaggerUi = require('swagger-ui-express'),
swaggerJSDoc = require('swagger-jsdoc'),
transport = 'HTTP';

function handleError(error, req, res, next) {
function handleError(error, req, res) {
var code = 500;

config.getLogger().debug(context, 'Error [%s] handing request: %s', error.name, error.message);
Expand Down Expand Up @@ -135,7 +137,7 @@ function checkMandatoryParams(queryPayload) {
* This middleware checks whether there is any polling command pending to be sent to the device. If there is some,
* add the command information to the return payload. Otherwise it returns an empty payload.
*/
function returnCommands(req, res, next) {
function returnCommands(req, res) {
function updateCommandStatus(device, commandList) {
var updates, cleanCommands;

Expand All @@ -159,7 +161,7 @@ function returnCommands(req, res, next) {
updates = commandList.map(createCommandUpdate);
cleanCommands = commandList.map(cleanCommand);

async.parallel(updates.concat(cleanCommands), function(error, results) {
async.parallel(updates.concat(cleanCommands), function(error) {
if (error) {
// prettier-ignore
config.getLogger().error(
Expand Down Expand Up @@ -339,20 +341,20 @@ function generateCommandExecution(apiKey, device, attribute) {
*/
function commandHandler(device, attributes, callback) {
utils.getEffectiveApiKey(device.service, device.subservice, function(error, apiKey) {
async.series(attributes.map(generateCommandExecution.bind(null, apiKey, device)), function(error) {
if (error) {
async.series(attributes.map(generateCommandExecution.bind(null, apiKey, device)), function(errorAsync) {
if (errorAsync) {
// prettier-ignore
config.getLogger().error(context,
'COMMANDS-004: Error handling incoming command for device [%s]', device.id);

utils.updateCommand(
apiKey,
device,
error.message,
error.command,
errorAsync.message,
errorAsync.command,
constants.COMMAND_STATUS_ERROR,
function(error) {
if (error) {
function(errorUpdateCommand) {
if (errorUpdateCommand) {
// prettier-ignore
config.getLogger().error(
context,
Expand Down Expand Up @@ -406,11 +408,69 @@ function start(callback) {
router: express.Router()
};

var options = {
swaggerDefinition: {
info: {
title: 'IoT Agent UL2 - HTTP',
version: '1.0.1',
description: 'This documentation explains the requests to the route /iot/d'
}
},
apis: ['./lib/bindings/*']
};
var swaggerSpec = swaggerJSDoc(options);

httpBindingServer.app.use('/api-docs', swaggerUi.serve, swaggerUi.setup(swaggerSpec));

config.getLogger().info(context, 'HTTP Binding listening on port [%s]', config.getConfig().http.port);

httpBindingServer.app.set('port', config.getConfig().http.port);
httpBindingServer.app.set('host', config.getConfig().http.host || '0.0.0.0');

/**
* @swagger
*
* /iot/d:
* get:
* tags:
* - "iot/d"
* summary: "Report new measures"
* description: "A device can report new measures to the IoT Platform"
* consumes:
* - "application/json"
* produces:
* - "application/json"
* parameters:
* - in: "query"
* name: "i"
* description: "Device ID"
* type: string
* required: true
* - in: "query"
* name: "k"
* description: "API Key for the service the device is registered on"
* type: string
* required: true
* - in: "query"
* name: "d"
* description: "Ultralight 2.0 payload"
* type: string
* required: true
* - in: "query"
* name: "t"
* description: "Timestamp of the measure"
* type: timestamp
* required: false
* responses:
* 200:
* description: "The new measure has been registered "
* 404:
* description: 'No device was found with "device_name"'
* DEVICE_GROUP_NOT_FOUND:
* description: "Could not find device group"
* PARSE_ERROR:
* description: 'There was a syntax error in the Ultralight request'
*/
httpBindingServer.router.get(
config.getConfig().iota.defaultResource || constants.HTTP_MEASURE_PATH,
checkMandatoryParams(true),
Expand All @@ -420,6 +480,49 @@ function start(callback) {
returnCommands
);

/**
* @swagger
*
* /iot/d:
* post:
* tags:
* - "iot/d"
* summary: "Registrer a new measure"
* description: "This request add a new measure to the datebase."
* operationId: "addDevices"
* consumes:
* - text/plain
* parameters:
* - in: query
* name: "i"
* description: "Device ID"
* type: string
* required: true
* - in: query
* name: "k"
* description: "API key. Service identification."
* type: string
* required: true
* - in: query
* name: "t"
* description: "Date and time of register"
* type: timestamp
* required: false
* - in: body
* name: "Sensors"
* description: 'Sensors Measurements'
* type: string
* required: true
* responses:
* 200:
* description: "Report new measures to the IoT Platform"
* 404:
* description: 'DEVICE_NOT_FOUND - No device was found with "device_name"'
* DEVICE_GROUP_NOT_FOUND:
* description: "Could not find device group"
* PARSE_ERROR:
* description: 'There was a syntax error in the Ultralight request.'
*/
httpBindingServer.router.post(
config.getConfig().iota.defaultResource || constants.HTTP_MEASURE_PATH,
addDefaultHeader,
Expand Down
14 changes: 7 additions & 7 deletions lib/bindings/MQTTBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ function generateTopics(callback) {
function recreateSubscriptions(callback) {
config.getLogger().debug(context, 'Recreating global subscriptions');

function subscribeToTopics(topics, callback) {
function subscribeToTopics(topics, callbackSubscribeToTopics) {
config.getLogger().debug('Subscribing to topics: %j', topics);

mqttClient.subscribe(topics, null, function(error) {
if (error) {
iotAgentLib.alarms.raise(constants.MQTTB_ALARM, error);
config.getLogger().error(context, ' GLOBAL-001: Error subscribing to topics: %s', error);
callback(error);
callbackSubscribeToTopics(error);
} else {
iotAgentLib.alarms.release(constants.MQTTB_ALARM);
config.getLogger().debug('Successfully subscribed to the following topics:\n%j\n', topics);
if (callback) {
callback(null);
if (callbackSubscribeToTopics) {
callbackSubscribeToTopics(null);
}
}
});
Expand All @@ -83,10 +83,10 @@ function recreateSubscriptions(callback) {
* Unsubscribe the MQTT Client for all the topics of all the devices of all the services.
*/
function unsubscribeAll(callback) {
function unsubscribeFromTopics(topics, callback) {
function unsubscribeFromTopics(topics, callbackUnsubscribeFromTopics) {
mqttClient.unsubscribe(topics, null);

callback();
callbackUnsubscribeFromTopics();
}

async.waterfall([generateTopics, unsubscribeFromTopics], callback);
Expand Down Expand Up @@ -252,7 +252,7 @@ function start(callback) {
}
});
mqttClient.on('message', commonBindings.mqttMessageHandler);
mqttClient.on('connect', function(ack) {
mqttClient.on('connect', function() {
config.getLogger().info(context, 'MQTT Client connected');
recreateSubscriptions();
});
Expand Down
22 changes: 11 additions & 11 deletions lib/commonBindings.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ function manageConfigurationRequest(apiKey, deviceId, device, objMessage) {
* @param {Number} index Index of the group in the array.
* @return {Array} Updated array of functions.
*/
function processMeasureGroup(device, apikey, previous, current, index) {
function processMeasureGroup(device, apikey, previous, current) {
var values = [];

if (current.command) {
Expand Down Expand Up @@ -250,20 +250,20 @@ function messageHandler(topic, message, protocol) {
messageStr = message.toString(),
parsedMessage;

function processMessageForDevice(device, apiKey, topicInformation) {
function processMessageForDevice(device, apiKeyProcessMessage, topicInformationProcessMessage) {
iotAgentLib.alarms.release(constants.MQTTB_ALARM);

if (
topicInformation[3] === constants.CONFIGURATION_SUFIX &&
topicInformation[4] === constants.CONFIGURATION_COMMAND_SUFIX &&
topicInformationProcessMessage[3] === constants.CONFIGURATION_SUFIX &&
topicInformationProcessMessage[4] === constants.CONFIGURATION_COMMAND_SUFIX &&
message
) {
parsedMessage = ulParser.parseConfigurationRequest(messageStr);
manageConfigurationRequest(apiKey, deviceId, device, parsedMessage);
} else if (topicInformation[3] === constants.CONFIGURATION_COMMAND_UPDATE) {
manageConfigurationRequest(apiKeyProcessMessage, deviceId, device, parsedMessage);
} else if (topicInformationProcessMessage[3] === constants.CONFIGURATION_COMMAND_UPDATE) {
var commandObj = ulParser.result(message.toString());
utils.updateCommand(
apiKey,
apiKeyProcessMessage,
device,
commandObj.result,
commandObj.command,
Expand All @@ -272,10 +272,10 @@ function messageHandler(topic, message, protocol) {
config.getLogger().debug('Command updated with result: %s', error);
}
);
} else if (topicInformation[4]) {
singleMeasure(apiKey, topicInformation[4], device, message);
} else if (topicInformation[3] === constants.MEASURES_SUFIX) {
multipleMeasures(apiKey, device, message.toString());
} else if (topicInformationProcessMessage[4]) {
singleMeasure(apiKeyProcessMessage, topicInformationProcessMessage[4], device, message);
} else if (topicInformationProcessMessage[3] === constants.MEASURES_SUFIX) {
multipleMeasures(apiKeyProcessMessage, device, message.toString());
} else {
config.getLogger().error(
context,
Expand Down
6 changes: 4 additions & 2 deletions lib/iotaUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ function findOrCreate(deviceId, transport, group, callback) {
newDevice.timestamp = group.timestamp;
}

iotAgentLib.register(newDevice, function(error, device) {
callback(error, device, group);
iotAgentLib.register(newDevice, function(errorRegister, deviceRegister) {
callback(errorRegister, deviceRegister, group);
});
} else {
callback(error);
Expand Down Expand Up @@ -148,6 +148,8 @@ function mergeDeviceWithConfiguration(deviceData, configuration, callback) {
deviceData[fields[i]] = configuration[confField];
} else if (!deviceData[fields[i]] && (!configuration || !configuration[confField])) {
deviceData[fields[i]] = defaults[i];
} else {
config.getLogger().error(context, 'There is no possible merge');
}

if (deviceData[fields[i]] && ['active', 'lazy', 'commands'].indexOf(fields[i]) >= 0) {
Expand Down
Loading