Skip to content

Update coreMQTT-Agent Library to MQTTv5 #129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 51 additions & 32 deletions source/core_mqtt_agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,20 @@ static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
*
* @param[in] pMqttContext MQTT Context
* @param[in] pPacketInfo Pointer to incoming packet.
* @param[in] pDeserializedInfo Pointer to deserialized information from
* the incoming packet.
* @param[in] pDeserializedInfo Pointer to deserialized information from the incoming packet.
* @param[out] pReasonCode Pointer to a variable where the application can set the reason code
* to include in outgoing PUBLISH ACK responses.
* @param[out] sendPropsBuffer Pointer to the MQTT property builder. The application can use this
* to add properties to the outgoing response packet.
* @param[in] getPropsBuffer Pointer to the MQTT property accessor. The application can use this
* to read properties received in the incoming MQTT packet.
*/
static void mqttEventCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo );
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer);

/**
* @brief Mark a command as complete after receiving an acknowledgment packet.
Expand Down Expand Up @@ -426,9 +434,7 @@ static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
{
bool isValid, isSpace = true;
MQTTStatus_t statusReturn;
const MQTTPublishInfo_t * pPublishInfo;
size_t uxHeaderBytes;
const size_t uxControlAndLengthBytes = ( size_t ) 4; /* Control, remaining length and length bytes. */
const MQTTAgentPublishArgs_t * pPublishArgs = NULL;

assert( pMqttAgentContext != NULL );
assert( pCommand != NULL );
Expand All @@ -452,26 +458,21 @@ static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
break;

case PUBLISH:
pPublishInfo = ( const MQTTPublishInfo_t * ) pMqttInfoParam;

/* Calculate the space consumed by everything other than the
* payload. */
uxHeaderBytes = uxControlAndLengthBytes;
uxHeaderBytes += pPublishInfo->topicNameLength;
assert( pMqttInfoParam != NULL );
pPublishArgs = ( const MQTTAgentPublishArgs_t * ) pMqttInfoParam;

/* This message type results in the broker returning an ACK. The
* agent maintains an array of outstanding ACK messages. See if
* the array contains space for another outstanding ack. QoS0
* publish does not result in an ack so it doesn't matter if
* there is no space in the ACK array. */
if( pPublishInfo->qos != MQTTQoS0 )
if( pPublishArgs->pPublishInfo->qos != MQTTQoS0 )
{
isSpace = isSpaceInPendingAckList( pMqttAgentContext );
}

/* Will the message fit in the defined buffer? */
isValid = ( uxHeaderBytes < pMqttAgentContext->mqttContext.networkBuffer.size ) &&
( isSpace == true );
isValid = isSpace ;

break;

Expand All @@ -495,7 +496,7 @@ static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,

statusReturn = ( isValid ) ? MQTTSuccess : MQTTBadParameter;

if( ( statusReturn == MQTTBadParameter ) && ( isSpace == false ) )
if( statusReturn == MQTTBadParameter )
{
/* The error was caused not by a bad parameter, but because there was
* no room in the pending Ack list for the Ack response to an outgoing
Expand Down Expand Up @@ -652,13 +653,20 @@ static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContex

static void mqttEventCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo )
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer)
{
MQTTAgentAckInfo_t * pAckInfo;
uint16_t packetIdentifier = pDeserializedInfo->packetIdentifier;
MQTTAgentContext_t * pAgentContext;
const uint8_t upperNibble = ( uint8_t ) 0xF0;

( void ) sendPropsBuffer; /* Unused parameter. */
( void ) getPropsBuffer; /* Unused parameter. */
( void ) pReasonCode; /* Unused parameter. */

assert( pMqttContext != NULL );
assert( pPacketInfo != NULL );

Expand Down Expand Up @@ -819,9 +827,10 @@ static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext )
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;
MQTTAgentAckInfo_t * pFoundAck = NULL;
MQTTPublishInfo_t * pOriginalPublish = NULL;
MQTTAgentPublishArgs_t * pOriginalPublish = NULL;
MQTTContext_t * pMqttContext;


assert( pMqttAgentContext != NULL );
pMqttContext = &( pMqttAgentContext->mqttContext );

Expand All @@ -835,9 +844,9 @@ static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext )
if( pFoundAck != NULL )
{
/* Set the DUP flag. */
pOriginalPublish = ( MQTTPublishInfo_t * ) ( pFoundAck->pOriginalCommand->pArgs );
pOriginalPublish->dup = true;
statusResult = MQTT_Publish( pMqttContext, pOriginalPublish, packetId );
pOriginalPublish = ( MQTTAgentPublishArgs_t * ) ( pFoundAck->pOriginalCommand->pArgs );
pOriginalPublish->pPublishInfo->dup = true;
statusResult = MQTT_Publish( pMqttContext, pOriginalPublish->pPublishInfo , packetId, pOriginalPublish->pProperties );

if( statusResult != MQTTSuccess )
{
Expand Down Expand Up @@ -932,6 +941,7 @@ static bool validateParams( MQTTAgentCommandType_t commandType,
bool ret = false;
const MQTTAgentConnectArgs_t * pConnectArgs = NULL;
const MQTTAgentSubscribeArgs_t * pSubscribeArgs = NULL;
const MQTTAgentPublishArgs_t * pPublishArgs = NULL;

assert( ( commandType == CONNECT ) || ( commandType == PUBLISH ) ||
( commandType == SUBSCRIBE ) || ( commandType == UNSUBSCRIBE ) );
Expand All @@ -941,7 +951,8 @@ static bool validateParams( MQTTAgentCommandType_t commandType,
case CONNECT:
pConnectArgs = ( const MQTTAgentConnectArgs_t * ) pParams;
ret = ( ( pConnectArgs != NULL ) &&
( pConnectArgs->pConnectInfo != NULL ) );
( pConnectArgs->pConnectInfo != NULL ) &&
( ( pConnectArgs->pWillInfo != NULL ) || ( pConnectArgs->pWillProperties == NULL ) ) );
break;

case SUBSCRIBE:
Expand All @@ -951,9 +962,12 @@ static bool validateParams( MQTTAgentCommandType_t commandType,
( pSubscribeArgs->pSubscribeInfo != NULL ) &&
( pSubscribeArgs->numSubscriptions != 0U ) );
break;

case PUBLISH:
pPublishArgs = ( const MQTTAgentPublishArgs_t * ) pParams;
ret = ( ( pPublishArgs != NULL ) &&
( pPublishArgs->pPublishInfo != NULL ) );
break ;
default:
/* Publish, does not need to be cast since we do not check it. */
ret = ( pParams != NULL );
break;
}
Expand All @@ -969,10 +983,11 @@ MQTTStatus_t MQTTAgent_Init( MQTTAgentContext_t * pMqttAgentContext,
const TransportInterface_t * pTransportInterface,
MQTTGetCurrentTimeFunc_t getCurrentTimeMs,
MQTTAgentIncomingPublishCallback_t incomingCallback,
void * pIncomingPacketContext )
void * pIncomingPacketContext,
uint8_t * pAckPropsBuffer,
size_t ackPropsBufferSize )
{
MQTTStatus_t returnStatus;

/**
* @brief Array used to maintain the outgoing publish records and their
* state by the coreMQTT library.
Expand Down Expand Up @@ -1020,7 +1035,9 @@ MQTTStatus_t MQTTAgent_Init( MQTTAgentContext_t * pMqttAgentContext,
pOutgoingPublishRecords,
MQTT_AGENT_MAX_OUTSTANDING_ACKS,
pIncomingPublishRecords,
MQTT_AGENT_MAX_OUTSTANDING_ACKS );
MQTT_AGENT_MAX_OUTSTANDING_ACKS,
pAckPropsBuffer,
ackPropsBufferSize );
}
}
#endif /* if ( MQTT_AGENT_USE_QOS_1_2_PUBLISH != 0 ) */
Expand Down Expand Up @@ -1223,20 +1240,20 @@ MQTTStatus_t MQTTAgent_Unsubscribe( const MQTTAgentContext_t * pMqttAgentContext
/*-----------------------------------------------------------*/

MQTTStatus_t MQTTAgent_Publish( const MQTTAgentContext_t * pMqttAgentContext,
MQTTPublishInfo_t * pPublishInfo,
MQTTAgentPublishArgs_t * pPublishArgs,
const MQTTAgentCommandInfo_t * pCommandInfo )
{
MQTTStatus_t statusReturn = MQTTBadParameter;
bool paramsValid = false;

paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
validateParams( PUBLISH, pPublishInfo );
validateParams( PUBLISH, pPublishArgs );

if( paramsValid )
{
statusReturn = createAndAddCommand( PUBLISH, /* commandType */
pMqttAgentContext, /* mqttContextHandle */
pPublishInfo, /* pMqttInfoParam */
pPublishArgs, /* pMqttInfoParam */
pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
pCommandInfo->blockTimeMs );
Expand Down Expand Up @@ -1296,18 +1313,20 @@ MQTTStatus_t MQTTAgent_Connect( const MQTTAgentContext_t * pMqttAgentContext,
/*-----------------------------------------------------------*/

MQTTStatus_t MQTTAgent_Disconnect( const MQTTAgentContext_t * pMqttAgentContext,
MQTTAgentDisconnectArgs_t * pDisconnectArgs,
const MQTTAgentCommandInfo_t * pCommandInfo )
{
MQTTStatus_t statusReturn = MQTTBadParameter;
bool paramsValid = false;

paramsValid = validateStruct( pMqttAgentContext, pCommandInfo );
paramsValid = validateStruct( pMqttAgentContext, pCommandInfo ) &&
validateParams(DISCONNECT , pDisconnectArgs );

if( paramsValid )
{
statusReturn = createAndAddCommand( DISCONNECT, /* commandType */
pMqttAgentContext, /* mqttContextHandle */
NULL, /* pMqttInfoParam */
pDisconnectArgs, /* pMqttInfoParam */
pCommandInfo->cmdCompleteCallback, /* commandCompleteCallback */
pCommandInfo->pCmdCompleteCallbackContext, /* pCommandCompleteCallbackContext */
pCommandInfo->blockTimeMs );
Expand Down
33 changes: 19 additions & 14 deletions source/core_mqtt_agent_command_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,29 @@ MQTTStatus_t MQTTAgentCommand_ProcessLoop( MQTTAgentContext_t * pMqttAgentContex
/*-----------------------------------------------------------*/

MQTTStatus_t MQTTAgentCommand_Publish( MQTTAgentContext_t * pMqttAgentContext,
void * pPublishArg,
void * pVoidPublishArgs,
MQTTAgentCommandFuncReturns_t * pReturnFlags )
{
const MQTTPublishInfo_t * pPublishInfo;
const MQTTAgentPublishArgs_t * pPublishArgs ;
MQTTStatus_t ret;

assert( pMqttAgentContext != NULL );
assert( pPublishArg != NULL );
assert( pPublishArgs != NULL );
assert( pReturnFlags != NULL );

( void ) memset( pReturnFlags, 0x00, sizeof( MQTTAgentCommandFuncReturns_t ) );
pPublishInfo = ( const MQTTPublishInfo_t * ) ( pPublishArg );
pPublishArgs = ( const MQTTAgentPublishArgs_t * ) ( pVoidPublishArgs );

if( pPublishInfo->qos != MQTTQoS0 )
if( pPublishArgs->pPublishInfo->qos != MQTTQoS0 )
{
pReturnFlags->packetId = MQTT_GetPacketId( &( pMqttAgentContext->mqttContext ) );
}

LogInfo( ( "Publishing message to %.*s.\n", ( int ) pPublishInfo->topicNameLength, pPublishInfo->pTopicName ) );
ret = MQTT_Publish( &( pMqttAgentContext->mqttContext ), pPublishInfo, pReturnFlags->packetId );
LogInfo( ( "Publishing message to %.*s.\n", ( int ) pPublishArgs->pPublishInfo->topicNameLength, pPublishArgs->pPublishInfo->pTopicName ) );
ret = MQTT_Publish( &( pMqttAgentContext->mqttContext ), pPublishArgs->pPublishInfo , pReturnFlags->packetId, pPublishArgs->pProperties );

/* Add to pending ack list, or call callback if QoS 0. */
pReturnFlags->addAcknowledgment = ( pPublishInfo->qos != MQTTQoS0 ) && ( ret == MQTTSuccess );
pReturnFlags->addAcknowledgment = ( pPublishArgs->pPublishInfo->qos != MQTTQoS0 ) && ( ret == MQTTSuccess );
pReturnFlags->runProcessLoop = true;

return ret;
Expand All @@ -106,7 +106,8 @@ MQTTStatus_t MQTTAgentCommand_Subscribe( MQTTAgentContext_t * pMqttAgentContext,
ret = MQTT_Subscribe( &( pMqttAgentContext->mqttContext ),
pSubscribeArgs->pSubscribeInfo,
pSubscribeArgs->numSubscriptions,
pReturnFlags->packetId );
pReturnFlags->packetId,
pSubscribeArgs->pProperties );

pReturnFlags->addAcknowledgment = ( ret == MQTTSuccess );
pReturnFlags->runProcessLoop = true;
Expand Down Expand Up @@ -134,7 +135,8 @@ MQTTStatus_t MQTTAgentCommand_Unsubscribe( MQTTAgentContext_t * pMqttAgentContex
ret = MQTT_Unsubscribe( &( pMqttAgentContext->mqttContext ),
pSubscribeArgs->pSubscribeInfo,
pSubscribeArgs->numSubscriptions,
pReturnFlags->packetId );
pReturnFlags->packetId,
pSubscribeArgs->pProperties );

pReturnFlags->addAcknowledgment = ( ret == MQTTSuccess );
pReturnFlags->runProcessLoop = true;
Expand All @@ -161,7 +163,9 @@ MQTTStatus_t MQTTAgentCommand_Connect( MQTTAgentContext_t * pMqttAgentContext,
pConnectInfo->pConnectInfo,
pConnectInfo->pWillInfo,
pConnectInfo->timeoutMs,
&( pConnectInfo->sessionPresent ) );
&( pConnectInfo->sessionPresent ),
pConnectInfo->pProperties,
pConnectInfo->pWillProperties );

/* Resume a session if one existed, else clear the list of acknowledgments. */
if( ret == MQTTSuccess )
Expand All @@ -179,17 +183,18 @@ MQTTStatus_t MQTTAgentCommand_Connect( MQTTAgentContext_t * pMqttAgentContext,
/*-----------------------------------------------------------*/

MQTTStatus_t MQTTAgentCommand_Disconnect( MQTTAgentContext_t * pMqttAgentContext,
void * pUnusedArg,
void * pVoidDisconnectArgs,
MQTTAgentCommandFuncReturns_t * pReturnFlags )
{
MQTTStatus_t ret;
MQTTAgentDisconnectArgs_t * pDisconnectArgs;

( void ) pUnusedArg;
pDisconnectArgs = ( MQTTAgentDisconnectArgs_t * ) ( pVoidDisconnectArgs );

assert( pMqttAgentContext != NULL );
assert( pReturnFlags != NULL );

ret = MQTT_Disconnect( &( pMqttAgentContext->mqttContext ) );
ret = MQTT_Disconnect( &( pMqttAgentContext->mqttContext ), pDisconnectArgs->pProperties, pDisconnectArgs->reasonCode );

( void ) memset( pReturnFlags, 0x00, sizeof( MQTTAgentCommandFuncReturns_t ) );
pReturnFlags->endLoop = true;
Expand Down
2 changes: 1 addition & 1 deletion source/dependency/coreMQTT
Submodule coreMQTT updated 47 files
+7 −0 .github/.cSpellWords.txt
+2 −2 .github/workflows/ci.yml
+0 −23 .github/workflows/formatting.yml
+7 −0 CTestTestfile.cmake
+0 −53 Dockerfile
+5 −0 MISRA.md
+1,646 −2 MigrationGuide.md
+13 −14 README.md
+1 −1 docs/doxygen/config.doxyfile
+6 −6 docs/doxygen/include/size_table.md
+271 −15 docs/doxygen/pages.dox
+2 −0 docs/doxygen/porting.dox
+ docs/plantuml/images/mqtt_connect_design.png
+ docs/plantuml/images/mqtt_processloop_design.png
+ docs/plantuml/images/mqtt_receiveloop_design.png
+0 −21 docs/plantuml/mqtt_connect_design.pu
+0 −32 docs/plantuml/mqtt_processloop_design.pu
+0 −30 docs/plantuml/mqtt_receiveloop_design.pu
+1 −1 manifest.yml
+3 −2 mqttFilePaths.cmake
+0 −13 requirements.txt
+1,293 −233 source/core_mqtt.c
+5,019 −649 source/core_mqtt_serializer.c
+66 −0 source/core_mqtt_utils.c
+244 −23 source/include/core_mqtt.h
+1 −1 source/include/core_mqtt_config_defaults.h
+2,094 −349 source/include/core_mqtt_serializer.h
+129 −0 source/include/core_mqtt_utils.h
+4 −0 source/interface/transport_interface.h
+9 −0 test/cbmc/include/mqtt_cbmc_state.h
+40 −0 test/cbmc/proofs/MQTT_GetBytesInMQTTVec/MQTT_GetBytesInMQTTVec_harness.c
+39 −0 test/cbmc/proofs/MQTT_GetBytesInMQTTVec/Makefile
+10 −0 test/cbmc/proofs/MQTT_GetBytesInMQTTVec/README.md
+1 −0 test/cbmc/proofs/MQTT_GetBytesInMQTTVec/cbmc-proof.txt
+7 −0 test/cbmc/proofs/MQTT_GetBytesInMQTTVec/cbmc-viewer.json
+52 −0 test/cbmc/proofs/MQTT_SerializeMQTTVec/MQTT_SerializeMQTTVec_harness.c
+40 −0 test/cbmc/proofs/MQTT_SerializeMQTTVec/Makefile
+10 −0 test/cbmc/proofs/MQTT_SerializeMQTTVec/README.md
+1 −0 test/cbmc/proofs/MQTT_SerializeMQTTVec/cbmc-proof.txt
+7 −0 test/cbmc/proofs/MQTT_SerializeMQTTVec/cbmc-viewer.json
+68 −1 test/cbmc/sources/mqtt_cbmc_state.c
+2 −1 test/unit-test/CMakeLists.txt
+3 −3 test/unit-test/core_mqtt_config.h
+3,678 −1,334 test/unit-test/core_mqtt_serializer_utest.c
+28 −10 test/unit-test/core_mqtt_state_utest.c
+2,289 −705 test/unit-test/core_mqtt_utest.c
+16 −11 tools/coverity/README.md
Loading
Loading