diff --git a/.gitignore b/.gitignore
index 5de810bd..00740fa5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,5 @@ repl_state/
 venv/
 *.egg-info
 .DS_Store
+.prompt
+.kiro
\ No newline at end of file
diff --git a/README.md b/README.md
index eeaf8016..a4920295 100644
--- a/README.md
+++ b/README.md
@@ -52,6 +52,7 @@ Strands Agents Tools is a community-driven project that provides a powerful set
 - ⏱️ **Task Scheduling** - Schedule and manage cron jobs
 - 🧠 **Advanced Reasoning** - Tools for complex thinking and reasoning capabilities
 - 🐝 **Swarm Intelligence** - Coordinate multiple AI agents for parallel problem solving with shared memory
+- 🔌 **Dynamic MCP Client** - ⚠️ Dynamically connect to external MCP servers and load remote tools (use with caution - see security warnings)
 - 🔄 **Multiple tools in Parallel**  - Call multiple other tools at the same time in parallel with Batch Tool
 - 🔍 **Browser Tool** - Tool giving an agent access to perform automated actions on a browser (chromium)
 
@@ -126,6 +127,7 @@ Below is a comprehensive table of all available tools, how to use them with an a
 | handoff_to_user | `agent.tool.handoff_to_user(message="Please confirm action", breakout_of_loop=False)` | Hand off control to user for confirmation, input, or complete task handoff |
 | use_llm | `agent.tool.use_llm(prompt="Analyze this data", system_prompt="You are a data analyst")` | Create nested AI loops with customized system prompts for specialized tasks |
 | workflow | `agent.tool.workflow(action="create", name="data_pipeline", steps=[{"tool": "file_read"}, {"tool": "python_repl"}])` | Define, execute, and manage multi-step automated workflows |
+| dynamic_mcp_client | `agent.tool.dynamic_mcp_client(action="connect", connection_id="my_server", transport="stdio", command="python", args=["server.py"])` | ⚠️ **SECURITY WARNING**: Dynamically connect to external MCP servers via stdio, sse, or streamable_http, list tools, and call remote tools. This can pose security risks as agents may connect to malicious servers. Use with caution in production. |
 | batch| `agent.tool.batch(invocations=[{"name": "current_time", "arguments": {"timezone": "Europe/London"}}, {"name": "stop", "arguments": {}}])` | Call multiple other tools in parallel. |
 | browser | `browser = LocalChromiumBrowser(); agent = Agent(tools=[browser.browser])` | Web scraping, automated testing, form filling, web automation tasks |
 
@@ -146,6 +148,68 @@ agent.tool.file_write(path="output.txt", content="Hello, world!")
 agent.tool.editor(command="view", path="script.py")
 ```
 
+### Dynamic MCP Client Integration
+
+⚠️ **SECURITY WARNING**: The Dynamic MCP Client allows agents to autonomously connect to external MCP servers and load remote tools at runtime. This poses significant security risks as agents can potentially connect to malicious servers and execute untrusted code. Use with extreme caution in production environments.
+
+This tool is different from the static MCP server implementation in the Strands SDK (see [MCP Tools Documentation](https://github.com/strands-agents/docs/blob/main/docs/user-guide/concepts/tools/mcp-tools.md)) which uses pre-configured, trusted MCP servers.
+
+```python
+from strands import Agent
+from strands_tools import dynamic_mcp_client
+
+agent = Agent(tools=[dynamic_mcp_client])
+
+# Connect to a custom MCP server via stdio
+agent.tool.dynamic_mcp_client(
+    action="connect",
+    connection_id="my_tools",
+    transport="stdio",
+    command="python",
+    args=["my_mcp_server.py"]
+)
+
+# List available tools on the server
+tools = agent.tool.dynamic_mcp_client(
+    action="list_tools",
+    connection_id="my_tools"
+)
+
+# Call a tool from the MCP server
+result = agent.tool.dynamic_mcp_client(
+    action="call_tool",
+    connection_id="my_tools",
+    tool_name="calculate",
+    tool_args={"x": 10, "y": 20}
+)
+
+# Connect to a SSE-based server
+agent.tool.dynamic_mcp_client(
+    action="connect",
+    connection_id="web_server",
+    transport="sse",
+    server_url="http://localhost:8080/sse"
+)
+
+# Connect to a streamable HTTP server
+agent.tool.dynamic_mcp_client(
+    action="connect",
+    connection_id="http_server",
+    transport="streamable_http",
+    server_url="https://api.example.com/mcp",
+    headers={"Authorization": "Bearer token"},
+    timeout=60
+)
+
+# Load MCP tools into agent's registry for direct access
+# ⚠️ WARNING: This loads external tools directly into the agent
+agent.tool.dynamic_mcp_client(
+    action="load_tools",
+    connection_id="my_tools"
+)
+# Now you can call MCP tools directly as: agent.tool.calculate(x=10, y=20)
+```
+
 ### Shell Commands
 
 *Note: `shell` does not work on Windows.*
@@ -580,6 +644,12 @@ The Mem0 Memory Tool supports three different backend configurations:
 |----------------------|-------------|---------|
 | ENV_VARS_MASKED_DEFAULT | Default setting for masking sensitive values | true |
 
+#### Dynamic MCP Client Tool
+
+| Environment Variable | Description | Default | 
+|----------------------|-------------|---------|
+| STRANDS_MCP_TIMEOUT | Default timeout in seconds for MCP operations | 30.0 |
+
 #### File Read Tool
 
 | Environment Variable | Description | Default |
diff --git a/src/strands_tools/dynamic_mcp_client.py b/src/strands_tools/dynamic_mcp_client.py
new file mode 100644
index 00000000..7a15dfa4
--- /dev/null
+++ b/src/strands_tools/dynamic_mcp_client.py
@@ -0,0 +1,721 @@
+"""Dynamic MCP Client Tool for Strands Agents.
+
+⚠️ SECURITY WARNING: This tool allows agents to autonomously connect to external
+MCP servers and dynamically load remote tools. This poses security risks as agents
+can potentially connect to malicious servers and execute untrusted code. Use with
+caution in production environments.
+
+This tool provides a high-level interface for dynamically connecting to any MCP server
+and loading remote tools at runtime. This is different from the static MCP server
+implementation in the Strands SDK (see https://github.com/strands-agents/docs/blob/main/docs/user-guide/concepts/tools/mcp-tools.md).
+
+Key differences from SDK's MCP implementation:
+- This tool enables DYNAMIC connections to new MCP servers at runtime
+- Can autonomously discover and load external tools from untrusted sources
+- Tools are loaded into the agent's registry and can be called directly
+- Connections persist across multiple tool invocations
+- Supports multiple concurrent connections to different MCP servers
+
+It leverages the Strands SDK's MCPClient for robust connection management
+and implements a per-operation connection pattern for stability.
+"""
+
+import logging
+import os
+import time
+from dataclasses import dataclass
+from datetime import timedelta
+from threading import Lock
+from typing import Any, Dict, List, Optional
+
+from mcp import StdioServerParameters, stdio_client
+from mcp.client.sse import sse_client
+from mcp.client.streamable_http import streamablehttp_client
+from strands import tool
+from strands.tools.mcp import MCPClient
+from strands.types.tools import AgentTool, ToolGenerator, ToolSpec, ToolUse
+
+logger = logging.getLogger(__name__)
+
+# Default timeout for MCP operations - can be overridden via environment variable
+DEFAULT_MCP_TIMEOUT = float(os.environ.get("STRANDS_MCP_TIMEOUT", "30.0"))
+
+
+class MCPTool(AgentTool):
+    """Wrapper class for dynamically loaded MCP tools that extends AgentTool.
+
+    This class wraps MCP tools loaded through dynamic_mcp_client and ensures proper
+    connection management using the `with mcp_client:` context pattern used throughout
+    the dynamic MCP client. It handles both sync and async tool execution while
+    maintaining connection health and error handling.
+    """
+
+    def __init__(self, mcp_tool, connection_id: str):
+        """Initialize MCPTool wrapper.
+
+        Args:
+            mcp_tool: The underlying MCP tool instance from the SDK
+            connection_id: ID of the connection this tool belongs to
+        """
+        super().__init__()
+        self._mcp_tool = mcp_tool
+        self._connection_id = connection_id
+        logger.debug(f"MCPTool wrapper created for tool '{mcp_tool.tool_name}' on connection '{connection_id}'")
+
+    @property
+    def tool_name(self) -> str:
+        """Get the name of the tool."""
+        return self._mcp_tool.tool_name
+
+    @property
+    def tool_spec(self) -> ToolSpec:
+        """Get the specification of the tool."""
+        return self._mcp_tool.tool_spec
+
+    @property
+    def tool_type(self) -> str:
+        """Get the type of the tool."""
+        return "mcp_dynamic"
+
+    async def stream(self, tool_use: ToolUse, invocation_state: dict[str, Any], **kwargs: Any) -> ToolGenerator:
+        """Stream the MCP tool execution with proper connection management.
+
+        This method uses the same `with mcp_client:` context pattern as other
+        operations in dynamic_mcp_client to ensure proper connection management
+        and error handling.
+
+        Args:
+            tool_use: The tool use request containing tool ID and parameters.
+            invocation_state: Context for the tool invocation, including agent state.
+            **kwargs: Additional keyword arguments for future extensibility.
+
+        Yields:
+            Tool events with the last being the tool result.
+        """
+        logger.debug(
+            f"MCPTool executing tool '{self.tool_name}' on connection '{self._connection_id}' "
+            f"with tool_use_id '{tool_use['toolUseId']}'"
+        )
+
+        # Get connection info
+        config = _get_connection(self._connection_id)
+        if not config:
+            error_result = {
+                "toolUseId": tool_use["toolUseId"],
+                "status": "error",
+                "content": [{"text": f"Connection '{self._connection_id}' not found"}],
+            }
+            yield error_result
+            return
+
+        if not config.is_active:
+            error_result = {
+                "toolUseId": tool_use["toolUseId"],
+                "status": "error",
+                "content": [{"text": f"Connection '{self._connection_id}' is not active"}],
+            }
+            yield error_result
+            return
+
+        try:
+            # Use the same context pattern as other operations in dynamic_mcp_client
+            with config.mcp_client:
+                result = await config.mcp_client.call_tool_async(
+                    tool_use_id=tool_use["toolUseId"],
+                    name=self.tool_name,
+                    arguments=tool_use["input"],
+                )
+                yield result
+
+        except Exception as e:
+            logger.error(f"Error executing MCP tool '{self.tool_name}': {e}", exc_info=True)
+
+            # Mark connection as unhealthy if it fails
+            with _CONNECTION_LOCK:
+                config.is_active = False
+                config.last_error = str(e)
+
+            error_result = {
+                "toolUseId": tool_use["toolUseId"],
+                "status": "error",
+                "content": [{"text": f"Failed to execute tool '{self.tool_name}': {str(e)}"}],
+            }
+            yield error_result
+
+    def get_display_properties(self) -> dict[str, str]:
+        """Get properties to display in UI representations of this tool."""
+        base_props = super().get_display_properties()
+        base_props["Connection ID"] = self._connection_id
+        return base_props
+
+
+@dataclass
+class ConnectionInfo:
+    """Information about an MCP connection."""
+
+    connection_id: str
+    mcp_client: MCPClient
+    transport: str
+    url: str
+    register_time: float
+    is_active: bool = True
+    last_error: Optional[str] = None
+    loaded_tool_names: List[str] = None
+
+    def __post_init__(self):
+        """Initialize mutable defaults."""
+        if self.loaded_tool_names is None:
+            self.loaded_tool_names = []
+
+
+# Thread-safe connection storage
+_connections: Dict[str, ConnectionInfo] = {}
+_CONNECTION_LOCK = Lock()
+
+
+def _get_connection(connection_id: str) -> Optional[ConnectionInfo]:
+    """Get a connection by ID with thread safety."""
+    with _CONNECTION_LOCK:
+        return _connections.get(connection_id)
+
+
+def _validate_connection(connection_id: str, check_active: bool = False) -> Optional[Dict[str, Any]]:
+    """Validate that a connection exists and optionally check if it's active."""
+    if not connection_id:
+        return {"status": "error", "content": [{"text": "connection_id is required"}]}
+
+    config = _get_connection(connection_id)
+    if not config:
+        return {"status": "error", "content": [{"text": f"Connection '{connection_id}' not found"}]}
+
+    if check_active and not config.is_active:
+        return {"status": "error", "content": [{"text": f"Connection '{connection_id}' is not active"}]}
+
+    return None
+
+
+def _create_transport_callable(transport: str, **params):
+    """Create a transport callable based on the transport type and parameters."""
+    if transport == "stdio":
+        command = params.get("command")
+        if not command:
+            raise ValueError("command is required for stdio transport")
+        args = params.get("args", [])
+        env = params.get("env")
+        stdio_params = {"command": command, "args": args}
+        if env:
+            stdio_params["env"] = env
+        return lambda: stdio_client(StdioServerParameters(**stdio_params))
+
+    elif transport == "sse":
+        server_url = params.get("server_url")
+        if not server_url:
+            raise ValueError("server_url is required for SSE transport")
+        return lambda: sse_client(server_url)
+
+    elif transport == "streamable_http":
+        server_url = params.get("server_url")
+        if not server_url:
+            raise ValueError("server_url is required for streamable HTTP transport")
+
+        # Build streamable HTTP parameters
+        http_params = {"url": server_url}
+        if params.get("headers"):
+            http_params["headers"] = params["headers"]
+        if params.get("timeout"):
+            http_params["timeout"] = timedelta(seconds=params["timeout"])
+        if params.get("sse_read_timeout"):
+            http_params["sse_read_timeout"] = timedelta(seconds=params["sse_read_timeout"])
+        if params.get("terminate_on_close") is not None:
+            http_params["terminate_on_close"] = params["terminate_on_close"]
+        if params.get("auth"):
+            http_params["auth"] = params["auth"]
+
+        return lambda: streamablehttp_client(**http_params)
+
+    else:
+        raise ValueError(f"Unsupported transport: {transport}. Supported: stdio, sse, streamable_http")
+
+
+@tool
+def dynamic_mcp_client(
+    action: str,
+    server_config: Optional[Dict[str, Any]] = None,
+    connection_id: Optional[str] = None,
+    tool_name: Optional[str] = None,
+    tool_args: Optional[Dict[str, Any]] = None,
+    # Additional parameters that can be passed directly
+    transport: Optional[str] = None,
+    command: Optional[str] = None,
+    args: Optional[List[str]] = None,
+    env: Optional[Dict[str, str]] = None,
+    server_url: Optional[str] = None,
+    arguments: Optional[Dict[str, Any]] = None,
+    # New streamable HTTP parameters
+    headers: Optional[Dict[str, Any]] = None,
+    timeout: Optional[float] = None,
+    sse_read_timeout: Optional[float] = None,
+    terminate_on_close: Optional[bool] = None,
+    auth: Optional[Any] = None,
+    agent: Optional[Any] = None,  # Agent instance passed by SDK
+) -> Dict[str, Any]:
+    """
+    Dynamic MCP client tool for autonomously connecting to external MCP servers.
+
+    ⚠️ SECURITY WARNING: This tool enables agents to autonomously connect to external
+    MCP servers and dynamically load remote tools at runtime. This can pose significant
+    security risks as agents may connect to malicious servers or execute untrusted code.
+
+    Key Security Considerations:
+    - Agents can connect to ANY MCP server URL or command provided
+    - External tools are loaded directly into the agent's tool registry
+    - Loaded tools can execute arbitrary code with agent's permissions
+    - Connections persist and can be reused across multiple operations
+
+    This is different from the static MCP server configuration in the Strands SDK
+    (see https://github.com/strands-agents/docs/blob/main/docs/user-guide/concepts/tools/mcp-tools.md)
+    which uses pre-configured, trusted MCP servers.
+
+    Supports multiple actions for comprehensive MCP server management:
+    - connect: Establish connection to an MCP server
+    - list_tools: List available tools from a connected server
+    - disconnect: Close connection to an MCP server
+    - call_tool: Directly invoke a tool on a connected server
+    - list_connections: Show all active MCP connections
+    - load_tools: Load MCP tools into agent's tool registry for direct access
+
+    Args:
+        action: The action to perform (connect, list_tools, disconnect, call_tool, list_connections)
+        server_config: Configuration for MCP server connection (optional, can use direct parameters)
+        connection_id: Identifier for the MCP connection
+        tool_name: Name of tool to call (for call_tool action)
+        tool_args: Arguments to pass to tool (for call_tool action)
+        transport: Transport type (stdio, sse, or streamable_http) - can be passed directly instead of in server_config
+        command: Command for stdio transport - can be passed directly
+        args: Arguments for stdio command - can be passed directly
+        env: Environment variables for stdio command - can be passed directly
+        server_url: URL for SSE or streamable_http transport - can be passed directly
+        arguments: Alternative to tool_args for tool arguments
+        headers: HTTP headers for streamable_http transport (optional)
+        timeout: Timeout in seconds for HTTP operations in streamable_http transport (default: 30)
+        sse_read_timeout: SSE read timeout in seconds for streamable_http transport (default: 300)
+        terminate_on_close: Whether to terminate connection on close for streamable_http transport (default: True)
+        auth: Authentication object for streamable_http transport (httpx.Auth compatible)
+
+    Returns:
+        Dict with the result of the operation
+
+    Examples:
+        # Connect to custom stdio server with direct parameters
+        dynamic_mcp_client(
+            action="connect",
+            connection_id="my_server",
+            transport="stdio",
+            command="python",
+            args=["my_server.py"]
+        )
+
+        # Connect to streamable HTTP server
+        dynamic_mcp_client(
+            action="connect",
+            connection_id="http_server",
+            transport="streamable_http",
+            server_url="https://example.com/mcp",
+            headers={"Authorization": "Bearer token"},
+            timeout=60
+        )
+
+        # Call a tool directly with parameters
+        dynamic_mcp_client(
+            action="call_tool",
+            connection_id="my_server",
+            tool_name="calculator",
+            tool_args={"x": 10, "y": 20}
+        )
+    """
+
+    try:
+        # Prepare parameters for action handlers
+        params = {
+            "action": action,
+            "connection_id": connection_id,
+            "tool_name": tool_name,
+            "tool_args": tool_args or arguments,  # Support both parameter names
+            "agent": agent,  # Pass agent instance to handlers
+        }
+
+        # Handle server configuration - merge direct parameters with server_config
+        if action == "connect":
+            if server_config is None:
+                server_config = {}
+
+            # Direct parameters override server_config
+            if transport is not None:
+                params["transport"] = transport
+            elif "transport" in server_config:
+                params["transport"] = server_config["transport"]
+
+            if command is not None:
+                params["command"] = command
+            elif "command" in server_config:
+                params["command"] = server_config["command"]
+
+            if args is not None:
+                params["args"] = args
+            elif "args" in server_config:
+                params["args"] = server_config["args"]
+
+            if server_url is not None:
+                params["server_url"] = server_url
+            elif "server_url" in server_config:
+                params["server_url"] = server_config["server_url"]
+
+            if env is not None:
+                params["env"] = env
+            elif "env" in server_config:
+                params["env"] = server_config["env"]
+
+            # Streamable HTTP specific parameters
+            if headers is not None:
+                params["headers"] = headers
+            elif "headers" in server_config:
+                params["headers"] = server_config["headers"]
+
+            if timeout is not None:
+                params["timeout"] = timeout
+            elif "timeout" in server_config:
+                params["timeout"] = server_config["timeout"]
+
+            if sse_read_timeout is not None:
+                params["sse_read_timeout"] = sse_read_timeout
+            elif "sse_read_timeout" in server_config:
+                params["sse_read_timeout"] = server_config["sse_read_timeout"]
+
+            if terminate_on_close is not None:
+                params["terminate_on_close"] = terminate_on_close
+            elif "terminate_on_close" in server_config:
+                params["terminate_on_close"] = server_config["terminate_on_close"]
+
+            if auth is not None:
+                params["auth"] = auth
+            elif "auth" in server_config:
+                params["auth"] = server_config["auth"]
+
+        # Process the action
+        if action == "connect":
+            return _connect_to_server(params)
+        elif action == "disconnect":
+            return _disconnect_from_server(params)
+        elif action == "list_connections":
+            return _list_active_connections(params)
+        elif action == "list_tools":
+            return _list_server_tools(params)
+        elif action == "call_tool":
+            return _call_server_tool(params)
+        elif action == "load_tools":
+            return _load_tools_to_agent(params)
+        else:
+            return {
+                "status": "error",
+                "content": [
+                    {
+                        "text": f"Unknown action: {action}. Available actions: "
+                        "connect, disconnect, list_connections, list_tools, call_tool, load_tools"
+                    }
+                ],
+            }
+
+    except Exception as e:
+        logger.error(f"Error in dynamic_mcp_client: {e}", exc_info=True)
+        return {"status": "error", "content": [{"text": f"Error in dynamic_mcp_client: {str(e)}"}]}
+
+
+def _connect_to_server(params: Dict[str, Any]) -> Dict[str, Any]:
+    """Connect to an MCP server using SDK's MCPClient."""
+    connection_id = params.get("connection_id")
+    if not connection_id:
+        return {"status": "error", "content": [{"text": "connection_id is required for connect action"}]}
+
+    transport = params.get("transport", "stdio")
+
+    # Check if connection already exists
+    with _CONNECTION_LOCK:
+        if connection_id in _connections and _connections[connection_id].is_active:
+            return {
+                "status": "error",
+                "content": [{"text": f"Connection '{connection_id}' already exists and is active"}],
+            }
+
+    try:
+        # Create transport callable using the SDK pattern
+        params_copy = params.copy()
+        params_copy.pop("transport", None)  # Remove transport to avoid duplicate parameter
+        transport_callable = _create_transport_callable(transport, **params_copy)
+
+        # Create MCPClient using SDK
+        mcp_client = MCPClient(transport_callable)
+
+        # Test the connection by listing tools using the context manager
+        # The context manager handles starting and stopping the client
+        with mcp_client:
+            tools = mcp_client.list_tools_sync()
+            tool_count = len(tools)
+
+        # At this point, the client has been initialized and tested
+        # The connection is ready for future use
+
+        # Store connection info
+        url = params.get("server_url", f"{params.get('command', '')} {' '.join(params.get('args', []))}")
+        connection_info = ConnectionInfo(
+            connection_id=connection_id,
+            mcp_client=mcp_client,
+            transport=transport,
+            url=url,
+            register_time=time.time(),
+            is_active=True,
+        )
+
+        with _CONNECTION_LOCK:
+            _connections[connection_id] = connection_info
+
+        connection_result = {
+            "message": f"Connected to MCP server '{connection_id}'",
+            "connection_id": connection_id,
+            "transport": transport,
+            "tools_count": tool_count,
+            "available_tools": [tool.tool_name for tool in tools],
+        }
+
+        return {
+            "status": "success",
+            "content": [{"text": f"Connected to MCP server '{connection_id}'"}, {"json": connection_result}],
+        }
+
+    except Exception as e:
+        logger.error(f"Connection failed: {e}", exc_info=True)
+        return {"status": "error", "content": [{"text": f"Connection failed: {str(e)}"}]}
+
+
+def _disconnect_from_server(params: Dict[str, Any]) -> Dict[str, Any]:
+    """Disconnect from an MCP server and clean up loaded tools."""
+    connection_id = params.get("connection_id")
+    agent = params.get("agent")
+    error_result = _validate_connection(connection_id)
+    if error_result:
+        return error_result
+
+    try:
+        with _CONNECTION_LOCK:
+            config = _connections[connection_id]
+            loaded_tools = config.loaded_tool_names.copy()
+
+            # Remove connection
+            del _connections[connection_id]
+
+        # Clean up loaded tools from agent if agent is provided
+        cleanup_result = {"cleaned_tools": [], "failed_tools": []}
+        if agent and loaded_tools:
+            cleanup_result = _clean_up_tools_from_agent(agent, connection_id, loaded_tools)
+
+        disconnect_result = {
+            "message": f"Disconnected from MCP server '{connection_id}'",
+            "connection_id": connection_id,
+            "was_active": config.is_active,
+        }
+
+        if cleanup_result["cleaned_tools"]:
+            disconnect_result["cleaned_tools"] = cleanup_result["cleaned_tools"]
+            disconnect_result["cleaned_tools_count"] = len(cleanup_result["cleaned_tools"])
+
+        if cleanup_result["failed_tools"]:
+            disconnect_result["failed_to_clean_tools"] = cleanup_result["failed_tools"]
+            disconnect_result["failed_tools_count"] = len(cleanup_result["failed_tools"])
+
+        if loaded_tools and not agent:
+            disconnect_result["loaded_tools_info"] = (
+                f"Note: No agent provided, {len(loaded_tools)} tools loaded could not be cleaned up: "
+                f"{', '.join(loaded_tools)}"
+            )
+
+        return {
+            "status": "success",
+            "content": [{"text": f"Disconnected from MCP server '{connection_id}'"}, {"json": disconnect_result}],
+        }
+    except Exception as e:
+        return {"status": "error", "content": [{"text": f"Disconnect failed: {str(e)}"}]}
+
+
+def _list_active_connections(params: Dict[str, Any]) -> Dict[str, Any]:
+    """List all active MCP connections."""
+    with _CONNECTION_LOCK:
+        connections_info = []
+        for conn_id, config in _connections.items():
+            connections_info.append(
+                {
+                    "connection_id": conn_id,
+                    "transport": config.transport,
+                    "url": config.url,
+                    "is_active": config.is_active,
+                    "registered_at": config.register_time,
+                    "last_error": config.last_error,
+                    "loaded_tools_count": len(config.loaded_tool_names),
+                }
+            )
+
+        connections_result = {"total_connections": len(_connections), "connections": connections_info}
+
+        return {
+            "status": "success",
+            "content": [{"text": f"Found {len(_connections)} MCP connections"}, {"json": connections_result}],
+        }
+
+
+def _list_server_tools(params: Dict[str, Any]) -> Dict[str, Any]:
+    """List available tools from a connected MCP server."""
+    connection_id = params.get("connection_id")
+    error_result = _validate_connection(connection_id, check_active=True)
+    if error_result:
+        return error_result
+
+    try:
+        config = _get_connection(connection_id)
+        with config.mcp_client:
+            tools = config.mcp_client.list_tools_sync()
+
+        tools_info = []
+        for tool in tools:
+            tool_spec = tool.tool_spec
+            tools_info.append(
+                {
+                    "name": tool.tool_name,
+                    "description": tool_spec.get("description", ""),
+                    "input_schema": tool_spec.get("inputSchema", {}),
+                }
+            )
+
+        tools_result = {"connection_id": connection_id, "tools_count": len(tools), "tools": tools_info}
+
+        return {
+            "status": "success",
+            "content": [{"text": f"Found {len(tools)} tools on MCP server '{connection_id}'"}, {"json": tools_result}],
+        }
+    except Exception as e:
+        return {"status": "error", "content": [{"text": f"Failed to list tools: {str(e)}"}]}
+
+
+def _call_server_tool(params: Dict[str, Any]) -> Dict[str, Any]:
+    """Call a tool on a connected MCP server."""
+    connection_id = params.get("connection_id")
+    tool_name = params.get("tool_name")
+
+    if not tool_name:
+        return {"status": "error", "content": [{"text": "tool_name is required for call_tool action"}]}
+
+    error_result = _validate_connection(connection_id, check_active=True)
+    if error_result:
+        return error_result
+
+    try:
+        config = _get_connection(connection_id)
+        tool_args = params.get("tool_args", {})
+
+        with config.mcp_client:
+            # Use SDK's call_tool_sync which returns proper ToolResult
+            return config.mcp_client.call_tool_sync(
+                tool_use_id=f"dynamic_mcp_{connection_id}_{tool_name}", name=tool_name, arguments=tool_args
+            )
+    except Exception as e:
+        return {"status": "error", "content": [{"text": f"Failed to call tool: {str(e)}"}]}
+
+
+def _clean_up_tools_from_agent(agent, connection_id: str, tool_names: List[str]) -> Dict[str, Any]:
+    """Clean up tools loaded from a specific connection from the agent's tool registry."""
+    if not agent or not hasattr(agent, "tool_registry") or not hasattr(agent.tool_registry, "unregister_tool"):
+        return {
+            "cleaned_tools": [],
+            "failed_tools": tool_names if tool_names else [],
+            "error": "Agent does not support tool unregistration",
+        }
+
+    cleaned_tools = []
+    failed_tools = []
+
+    for tool_name in tool_names:
+        try:
+            agent.tool_registry.unregister_tool(tool_name)
+            cleaned_tools.append(tool_name)
+        except Exception as e:
+            failed_tools.append(f"{tool_name} ({str(e)})")
+
+    return {"cleaned_tools": cleaned_tools, "failed_tools": failed_tools}
+
+
+def _load_tools_to_agent(params: Dict[str, Any]) -> Dict[str, Any]:
+    """Load MCP tools into agent's tool registry using MCPTool wrapper."""
+    connection_id = params.get("connection_id")
+    agent = params.get("agent")
+
+    if not agent:
+        return {"status": "error", "content": [{"text": "agent instance is required for load_tools action"}]}
+
+    error_result = _validate_connection(connection_id, check_active=True)
+    if error_result:
+        return error_result
+
+    # Check if agent has tool_registry
+    if not hasattr(agent, "tool_registry") or not hasattr(agent.tool_registry, "register_tool"):
+        return {
+            "status": "error",
+            "content": [
+                {"text": "Agent does not have a tool registry. Make sure you're using a compatible Strands agent."}
+            ],
+        }
+
+    try:
+        config = _get_connection(connection_id)
+
+        with config.mcp_client:
+            # Use SDK's list_tools_sync which returns MCPAgentTool instances
+            tools = config.mcp_client.list_tools_sync()
+
+        loaded_tools = []
+        skipped_tools = []
+
+        for tool in tools:
+            try:
+                # Wrap the MCP tool with our MCPTool class that handles context management
+                wrapped_tool = MCPTool(tool, connection_id)
+
+                # Register the wrapped tool with the agent
+                logger.info(f"Loading MCP tool [{tool.tool_name}] wrapped in MCPTool")
+                agent.tool_registry.register_tool(wrapped_tool)
+                loaded_tools.append(tool.tool_name)
+
+            except Exception as e:
+                skipped_tools.append({"name": tool.tool_name, "error": str(e)})
+
+        # Update loaded tools list
+        with _CONNECTION_LOCK:
+            config.loaded_tool_names.extend(loaded_tools)
+
+        load_result = {
+            "message": f"Loaded {len(loaded_tools)} tools from MCP server '{connection_id}'",
+            "connection_id": connection_id,
+            "loaded_tools": loaded_tools,
+            "tool_count": len(loaded_tools),  # Add this field for test compatibility
+            "total_loaded_tools": len(config.loaded_tool_names),
+        }
+
+        if skipped_tools:
+            load_result["skipped_tools"] = skipped_tools
+
+        return {
+            "status": "success",
+            "content": [
+                {"text": f"Loaded {len(loaded_tools)} tools from MCP server '{connection_id}'"},
+                {"json": load_result},
+            ],
+        }
+
+    except Exception as e:
+        return {"status": "error", "content": [{"text": f"Failed to load tools: {str(e)}"}]}
diff --git a/tests/test_dynamic_mcp_client.py b/tests/test_dynamic_mcp_client.py
new file mode 100644
index 00000000..bd2e2b23
--- /dev/null
+++ b/tests/test_dynamic_mcp_client.py
@@ -0,0 +1,1023 @@
+"""
+Tests for the Dynamic MCP client tool.
+
+These tests directly call the dynamic_mcp_client function rather than going through
+the Agent interface for simpler and more focused testing.
+"""
+
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from strands_tools.dynamic_mcp_client import ConnectionInfo, MCPTool, _connections, dynamic_mcp_client
+
+
+@pytest.fixture
+def mock_mcp_client():
+    """Mock the MCPClient class for testing."""
+    with patch("strands_tools.dynamic_mcp_client.MCPClient") as mock_client_class:
+        # Create a mock instance
+        mock_instance = MagicMock()
+        mock_client_class.return_value = mock_instance
+
+        # Setup context manager behavior
+        mock_instance.__enter__ = MagicMock(return_value=mock_instance)
+        mock_instance.__exit__ = MagicMock(return_value=None)
+
+        # Setup default tool list
+        mock_tool1 = MagicMock()
+        mock_tool1.tool_name = "test_tool"
+        mock_tool1.tool_spec = {
+            "description": "A test tool",
+            "inputSchema": {"json": {"type": "object", "properties": {"param": {"type": "string"}}}},
+        }
+
+        mock_instance.list_tools_sync.return_value = [mock_tool1]
+
+        # Setup tool call response
+        mock_instance.call_tool_sync.return_value = {
+            "status": "success",
+            "toolUseId": "test-id",
+            "content": [{"text": "Tool executed successfully"}],
+        }
+
+        yield mock_client_class, mock_instance
+
+
+@pytest.fixture
+def mock_stdio_client():
+    """Mock stdio_client for testing."""
+    with patch("strands_tools.dynamic_mcp_client.stdio_client") as mock_stdio:
+        mock_stdio.return_value = MagicMock()
+        yield mock_stdio
+
+
+@pytest.fixture
+def mock_sse_client():
+    """Mock sse_client for testing."""
+    with patch("strands_tools.dynamic_mcp_client.sse_client") as mock_sse:
+        mock_sse.return_value = MagicMock()
+        yield mock_sse
+
+
+@pytest.fixture
+def mock_streamablehttp_client():
+    """Mock streamablehttp_client for testing."""
+    with patch("strands_tools.dynamic_mcp_client.streamablehttp_client") as mock_streamable:
+        mock_streamable.return_value = MagicMock()
+        yield mock_streamable
+
+
+@pytest.fixture(autouse=True)
+def reset_connections():
+    """Reset the connections dictionary between tests."""
+    # Clear all connections
+    _connections.clear()
+
+    yield _connections
+
+
+class TestMCPClientConnect:
+    """Test connection-related functionality."""
+
+    def test_connect_stdio_transport(self, mock_mcp_client, mock_stdio_client):
+        """Test connecting to an MCP server via stdio transport."""
+        result = dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        assert result["status"] == "success"
+        # Text message
+        assert "Connected to MCP server 'test_server'" in result["content"][0]["text"]
+
+        # Structured data
+        connection_data = result["content"][1]["json"]
+        assert "Connected to MCP server 'test_server'" in connection_data["message"]
+        assert connection_data["connection_id"] == "test_server"
+        assert connection_data["transport"] == "stdio"
+        assert connection_data["tools_count"] == 1
+        assert connection_data["available_tools"] == ["test_tool"]
+
+        # Verify MCPClient was created correctly
+        mock_client_class, mock_instance = mock_mcp_client
+        mock_client_class.assert_called_once()
+        mock_instance.list_tools_sync.assert_called_once()
+
+    def test_connect_streamable_http_transport(self, mock_mcp_client, mock_streamablehttp_client):
+        """Test connecting to an MCP server via streamable HTTP transport."""
+        result = dynamic_mcp_client(
+            action="connect",
+            connection_id="http_server",
+            transport="streamable_http",
+            server_url="https://example.com/mcp",
+            headers={"Authorization": "Bearer token123"},
+            timeout=60,
+            sse_read_timeout=180,
+        )
+
+        assert result["status"] == "success"
+        # Text message
+        assert "Connected to MCP server 'http_server'" in result["content"][0]["text"]
+
+        # Structured data
+        connection_data = result["content"][1]["json"]
+        assert "Connected to MCP server 'http_server'" in connection_data["message"]
+        assert connection_data["connection_id"] == "http_server"
+        assert connection_data["transport"] == "streamable_http"
+        assert connection_data["tools_count"] == 1
+        assert connection_data["available_tools"] == ["test_tool"]
+
+        # Verify MCPClient was created correctly
+        mock_client_class, mock_instance = mock_mcp_client
+        mock_client_class.assert_called_once()
+        mock_instance.list_tools_sync.assert_called_once()
+
+    def test_connect_streamable_http_minimal_params(self, mock_mcp_client, mock_streamablehttp_client):
+        """Test connecting to streamable HTTP server with minimal parameters."""
+        result = dynamic_mcp_client(
+            action="connect",
+            connection_id="simple_http",
+            transport="streamable_http",
+            server_url="https://api.example.com/mcp",
+        )
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert connection_data["transport"] == "streamable_http"
+
+    def test_connect_streamable_http_missing_url(self):
+        """Test connecting to streamable HTTP without server_url."""
+        result = dynamic_mcp_client(action="connect", connection_id="test", transport="streamable_http")
+        assert result["status"] == "error"
+        assert "server_url is required for streamable HTTP transport" in result["content"][0]["text"]
+
+    def test_connect_streamable_http_with_auth(self, mock_mcp_client, mock_streamablehttp_client):
+        """Test connecting to streamable HTTP server with authentication."""
+        # Mock httpx auth object
+        mock_auth = MagicMock()
+
+        result = dynamic_mcp_client(
+            action="connect",
+            connection_id="auth_http_server",
+            transport="streamable_http",
+            server_url="https://secure.example.com/mcp",
+            auth=mock_auth,
+            headers={"User-Agent": "Test-Client/1.0"},
+        )
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert connection_data["connection_id"] == "auth_http_server"
+
+    def test_connect_streamable_http_server_config(self, mock_mcp_client, mock_streamablehttp_client):
+        """Test connecting using server_config with streamable HTTP parameters."""
+        result = dynamic_mcp_client(
+            action="connect",
+            connection_id="config_http_server",
+            server_config={
+                "transport": "streamable_http",
+                "server_url": "https://config.example.com/mcp",
+                "headers": {"X-API-Key": "secret123"},
+                "timeout": 45,
+                "sse_read_timeout": 240,
+                "terminate_on_close": False,
+            },
+        )
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert connection_data["connection_id"] == "config_http_server"
+
+    def test_connect_sse_transport(self, mock_mcp_client, mock_sse_client):
+        """Test connecting to an MCP server via SSE transport."""
+        result = dynamic_mcp_client(
+            action="connect", connection_id="sse_server", transport="sse", server_url="http://localhost:8080/mcp"
+        )
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert "Connected to MCP server 'sse_server'" in connection_data["message"]
+        assert connection_data["connection_id"] == "sse_server"
+        assert connection_data["transport"] == "sse"
+
+    def test_connect_unsupported_transport(self):
+        """Test connecting with an unsupported transport type."""
+        result = dynamic_mcp_client(action="connect", connection_id="test", transport="unsupported_transport")
+        assert result["status"] == "error"
+        assert "Connection failed" in result["content"][0]["text"]
+        assert "Unsupported transport: unsupported_transport" in result["content"][0]["text"]
+
+    def test_connect_missing_required_params(self):
+        """Test connecting with missing required parameters."""
+        # Missing connection_id
+        result = dynamic_mcp_client(action="connect", transport="stdio", command="python")
+        assert result["status"] == "error"
+        assert "connection_id is required" in result["content"][0]["text"]
+
+        # Missing command for stdio
+        result = dynamic_mcp_client(action="connect", connection_id="test", transport="stdio")
+        assert result["status"] == "error"
+        assert "command is required for stdio transport" in result["content"][0]["text"]
+
+        # Missing server_url for SSE
+        result = dynamic_mcp_client(action="connect", connection_id="test", transport="sse")
+        assert result["status"] == "error"
+        assert "server_url is required for SSE transport" in result["content"][0]["text"]
+
+        # Missing server_url for streamable HTTP
+        result = dynamic_mcp_client(action="connect", connection_id="test", transport="streamable_http")
+        assert result["status"] == "error"
+        assert "server_url is required for streamable HTTP transport" in result["content"][0]["text"]
+
+    def test_connect_duplicate_connection(self, mock_mcp_client, mock_stdio_client):
+        """Test connecting with an existing connection ID."""
+        # First connection
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Try to connect again with same ID
+        result = dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        assert result["status"] == "error"
+        assert "already exists and is active" in result["content"][0]["text"]
+
+    def test_connect_with_server_config(self, mock_mcp_client, mock_stdio_client):
+        """Test connecting using server_config parameter."""
+        result = dynamic_mcp_client(
+            action="connect",
+            connection_id="test_server",
+            server_config={"transport": "stdio", "command": "python", "args": ["server.py"]},
+        )
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert connection_data["connection_id"] == "test_server"
+
+    def test_connect_with_environment_variables(self, mock_mcp_client, mock_stdio_client):
+        """Test connecting with environment variables (parameters passed but not stored)."""
+        result = dynamic_mcp_client(
+            action="connect",
+            connection_id="test_server_with_env",
+            transport="stdio",
+            command="python",
+            args=["server.py"],
+            env={"API_KEY": "test-key-123", "DEBUG": "true"},
+        )
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert connection_data["connection_id"] == "test_server_with_env"
+
+    def test_connect_with_env_in_server_config(self, mock_mcp_client, mock_stdio_client):
+        """Test connecting with environment variables in server_config (parameters passed but not stored)."""
+        result = dynamic_mcp_client(
+            action="connect",
+            connection_id="test_server_config_env",
+            server_config={
+                "transport": "stdio",
+                "command": "npx",
+                "args": ["-y", "server-perplexity-ask"],
+                "env": {"PERPLEXITY_API_KEY": "pplx-test-key"},
+            },
+        )
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert connection_data["connection_id"] == "test_server_config_env"
+
+    def test_connect_failure(self, mock_mcp_client):
+        """Test handling connection failure."""
+        # Make list_tools_sync raise an exception
+        _, mock_instance = mock_mcp_client
+        mock_instance.list_tools_sync.side_effect = Exception("Connection failed")
+
+        result = dynamic_mcp_client(
+            action="connect",
+            connection_id="failing_server",
+            transport="stdio",
+            command="python",
+            args=["failing_server.py"],
+        )
+
+        assert result["status"] == "error"
+        assert "Connection failed" in result["content"][0]["text"]
+
+
+class TestMCPClientDisconnect:
+    """Test disconnection functionality."""
+
+    def test_disconnect_active_connection(self, mock_mcp_client, mock_stdio_client):
+        """Test disconnecting from an active connection."""
+        # First connect
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Then disconnect
+        result = dynamic_mcp_client(action="disconnect", connection_id="test_server")
+
+        assert result["status"] == "success"
+        # Text message
+        assert "Disconnected from MCP server 'test_server'" in result["content"][0]["text"]
+
+        # Structured data
+        disconnect_data = result["content"][1]["json"]
+        assert "Disconnected from MCP server 'test_server'" in disconnect_data["message"]
+        assert disconnect_data["was_active"] is True
+
+    def test_disconnect_nonexistent_connection(self):
+        """Test disconnecting from a non-existent connection."""
+        result = dynamic_mcp_client(action="disconnect", connection_id="nonexistent")
+
+        assert result["status"] == "error"
+        assert "Connection 'nonexistent' not found" in result["content"][0]["text"]
+
+    def test_disconnect_missing_connection_id(self):
+        """Test disconnecting without providing connection_id."""
+        result = dynamic_mcp_client(action="disconnect")
+
+        assert result["status"] == "error"
+        assert "connection_id is required" in result["content"][0]["text"]
+
+    def test_disconnect_with_loaded_tools(self, mock_mcp_client, mock_stdio_client, reset_connections):
+        """Test disconnecting from a connection with loaded tools."""
+        # Connect
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Track some loaded tools by updating the connection info
+        if "test_server" in _connections:
+            _connections["test_server"].loaded_tool_names = ["tool1", "tool2"]
+
+        # Disconnect
+        result = dynamic_mcp_client(action="disconnect", connection_id="test_server")
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        disconnect_data = result["content"][1]["json"]
+        assert "loaded_tools_info" in disconnect_data
+        assert "2 tools loaded" in disconnect_data["loaded_tools_info"]
+        assert "tool1" in disconnect_data["loaded_tools_info"]
+
+
+class TestMCPClientListConnections:
+    """Test listing connections functionality."""
+
+    def test_list_empty_connections(self):
+        """Test listing connections when none exist."""
+        result = dynamic_mcp_client(action="list_connections")
+
+        assert result["status"] == "success"
+        # Text message
+        assert "Found 0 MCP connections" in result["content"][0]["text"]
+
+        # Structured data
+        connections_data = result["content"][1]["json"]
+        assert connections_data["total_connections"] == 0
+        assert connections_data["connections"] == []
+
+    def test_list_multiple_connections(
+        self, mock_mcp_client, mock_stdio_client, mock_sse_client, mock_streamablehttp_client
+    ):
+        """Test listing multiple connections."""
+        # Create multiple connections
+        dynamic_mcp_client(
+            action="connect", connection_id="stdio_server", transport="stdio", command="python", args=["server1.py"]
+        )
+
+        dynamic_mcp_client(
+            action="connect", connection_id="sse_server", transport="sse", server_url="http://localhost:8080/mcp"
+        )
+
+        dynamic_mcp_client(
+            action="connect",
+            connection_id="http_server",
+            transport="streamable_http",
+            server_url="https://example.com/mcp",
+        )
+
+        # List connections
+        result = dynamic_mcp_client(action="list_connections")
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connections_data = result["content"][1]["json"]
+        assert connections_data["total_connections"] == 3
+        assert len(connections_data["connections"]) == 3
+
+        # Verify connection details
+        conn_ids = [conn["connection_id"] for conn in connections_data["connections"]]
+        assert "stdio_server" in conn_ids
+        assert "sse_server" in conn_ids
+        assert "http_server" in conn_ids
+
+        # Check stdio connection
+        stdio_conn = next(c for c in connections_data["connections"] if c["connection_id"] == "stdio_server")
+        assert stdio_conn["transport"] == "stdio"
+        assert stdio_conn["is_active"] is True
+
+        # Check SSE connection
+        sse_conn = next(c for c in connections_data["connections"] if c["connection_id"] == "sse_server")
+        assert sse_conn["transport"] == "sse"
+        assert sse_conn["url"] == "http://localhost:8080/mcp"
+
+        # Check streamable HTTP connection
+        http_conn = next(c for c in connections_data["connections"] if c["connection_id"] == "http_server")
+        assert http_conn["transport"] == "streamable_http"
+        assert http_conn["url"] == "https://example.com/mcp"
+
+
+class TestMCPClientListTools:
+    """Test listing tools functionality."""
+
+    def test_list_tools_success(self, mock_mcp_client, mock_stdio_client):
+        """Test listing tools from a connected server."""
+        # Connect first
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # List tools
+        result = dynamic_mcp_client(action="list_tools", connection_id="test_server")
+
+        assert result["status"] == "success"
+        # Text message
+        assert "Found 1 tools on MCP server 'test_server'" in result["content"][0]["text"]
+
+        # Structured data
+        tools_data = result["content"][1]["json"]
+        assert tools_data["connection_id"] == "test_server"
+        assert tools_data["tools_count"] == 1
+        assert len(tools_data["tools"]) == 1
+        assert tools_data["tools"][0]["name"] == "test_tool"
+        assert tools_data["tools"][0]["description"] == "A test tool"
+
+    def test_list_tools_nonexistent_connection(self):
+        """Test listing tools from a non-existent connection."""
+        result = dynamic_mcp_client(action="list_tools", connection_id="nonexistent")
+
+        assert result["status"] == "error"
+        assert "Connection 'nonexistent' not found" in result["content"][0]["text"]
+
+    def test_list_tools_missing_connection_id(self):
+        """Test listing tools without providing connection_id."""
+        result = dynamic_mcp_client(action="list_tools")
+
+        assert result["status"] == "error"
+        assert "connection_id is required" in result["content"][0]["text"]
+
+    def test_list_tools_connection_failure(self, mock_mcp_client, mock_stdio_client):
+        """Test handling errors when listing tools."""
+        # Connect first
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Make list_tools_sync fail
+        _, mock_instance = mock_mcp_client
+        mock_instance.list_tools_sync.side_effect = Exception("Server error")
+
+        # Try to list tools
+        result = dynamic_mcp_client(action="list_tools", connection_id="test_server")
+
+        assert result["status"] == "error"
+        assert "Failed to list tools" in result["content"][0]["text"]
+        assert "Server error" in result["content"][0]["text"]
+
+
+class TestMCPClientCallTool:
+    """Test calling tools functionality."""
+
+    def test_call_tool_success(self, mock_mcp_client, mock_stdio_client):
+        """Test successfully calling a tool."""
+        # Connect first
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Call a tool
+        result = dynamic_mcp_client(
+            action="call_tool", connection_id="test_server", tool_name="test_tool", tool_args={"param": "value"}
+        )
+
+        assert result["status"] == "success"
+        assert result["toolUseId"] == "test-id"
+        assert result["content"][0]["text"] == "Tool executed successfully"
+
+    def test_call_tool_with_direct_params(self, mock_mcp_client, mock_stdio_client):
+        """Test calling a tool with parameters passed directly - they should be explicitly provided in tool_args."""
+        # Connect first
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Call tool with direct parameters - should now use tool_args explicitly
+        result = dynamic_mcp_client(
+            action="call_tool",
+            connection_id="test_server",
+            tool_name="test_tool",
+            tool_args={"param": "direct_value", "another_param": 123},
+        )
+
+        assert result["status"] == "success"
+        # Verify the SDK call was made with the correct arguments
+        _, mock_instance = mock_mcp_client
+        mock_instance.call_tool_sync.assert_called_with(
+            tool_use_id="dynamic_mcp_test_server_test_tool",
+            name="test_tool",
+            arguments={"param": "direct_value", "another_param": 123},
+        )
+
+    def test_call_tool_missing_params(self):
+        """Test calling a tool with missing parameters."""
+        # Missing connection_id
+        result = dynamic_mcp_client(action="call_tool", tool_name="test_tool")
+        assert result["status"] == "error"
+        assert "connection_id is required" in result["content"][0]["text"]
+
+        # Missing tool_name
+        result = dynamic_mcp_client(action="call_tool", connection_id="test_server")
+        assert result["status"] == "error"
+        assert "tool_name is required" in result["content"][0]["text"]
+
+    def test_call_tool_error(self, mock_mcp_client, mock_stdio_client):
+        """Test handling errors when calling a tool."""
+        # Connect first
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Make call_tool_sync fail
+        _, mock_instance = mock_mcp_client
+        mock_instance.call_tool_sync.side_effect = Exception("Tool execution failed")
+
+        # Try to call tool
+        result = dynamic_mcp_client(action="call_tool", connection_id="test_server", tool_name="test_tool")
+
+        assert result["status"] == "error"
+        assert "Failed to call tool" in result["content"][0]["text"]
+        assert "Tool execution failed" in result["content"][0]["text"]
+
+
+class TestMCPClientLoadTools:
+    """Test loading tools functionality."""
+
+    def test_load_tools_success(self, mock_mcp_client, mock_stdio_client):
+        """Test successfully loading tools into agent with MCPTool wrapper."""
+        # Mock agent's tool_registry
+        mock_agent = MagicMock()
+        mock_registry = MagicMock()
+        mock_registry.register_tool = MagicMock()
+        mock_agent.tool_registry = mock_registry
+
+        # Connect first
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Load tools
+        result = dynamic_mcp_client(action="load_tools", connection_id="test_server", agent=mock_agent)
+
+        assert result["status"] == "success"
+        # Text message
+        assert "Loaded 1 tools from MCP server 'test_server'" in result["content"][0]["text"]
+
+        # Structured data
+        load_data = result["content"][1]["json"]
+        assert "Loaded 1 tools" in load_data["message"]
+        assert load_data["loaded_tools"] == ["test_tool"]
+        assert load_data["tool_count"] == 1
+
+        # Verify tool was registered as MCPTool wrapper
+        mock_registry.register_tool.assert_called_once()
+        registered_tool = mock_registry.register_tool.call_args[0][0]
+        assert isinstance(registered_tool, MCPTool)
+        assert registered_tool.tool_name == "test_tool"
+        assert registered_tool._connection_id == "test_server"
+
+    def test_load_tools_no_agent(self, mock_mcp_client, mock_stdio_client):
+        """Test loading tools without agent instance."""
+        # Connect first to ensure we get to the agent check
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        result = dynamic_mcp_client(action="load_tools", connection_id="test_server")
+
+        assert result["status"] == "error"
+        # The current implementation expects this specific message
+        assert "agent instance is required" in result["content"][0]["text"]
+
+    def test_load_tools_no_registry(self, mock_mcp_client, mock_stdio_client):
+        """Test loading tools with agent that has no tool registry."""
+        # Create a mock agent without tool_registry
+        mock_agent = MagicMock(spec=[])  # No attributes
+
+        # Connect first
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Try to load tools
+        result = dynamic_mcp_client(action="load_tools", connection_id="test_server", agent=mock_agent)
+
+        assert result["status"] == "error"
+        assert "Agent does not have a tool registry" in result["content"][0]["text"]
+
+    def test_load_tools_inactive_connection(self, reset_connections):
+        """Test loading tools from an inactive connection."""
+        # Manually register an inactive connection
+        mock_client = MagicMock()
+        config = ConnectionInfo(
+            connection_id="inactive_server",
+            mcp_client=mock_client,
+            transport="stdio",
+            url="python server.py",
+            register_time=0,
+            is_active=False,
+        )
+        _connections["inactive_server"] = config
+
+        # Mock agent
+        mock_agent = MagicMock()
+
+        # Try to load tools
+        result = dynamic_mcp_client(action="load_tools", connection_id="inactive_server", agent=mock_agent)
+
+        assert result["status"] == "error"
+        assert "Connection 'inactive_server' is not active" in result["content"][0]["text"]
+
+    def test_load_tools_with_errors(self, mock_mcp_client, mock_stdio_client):
+        """Test loading tools with some registration failures."""
+        # Setup multiple tools
+        mock_tool1 = MagicMock()
+        mock_tool1.tool_name = "tool1"
+        mock_tool1.tool_spec = {"description": "Tool 1"}
+
+        mock_tool2 = MagicMock()
+        mock_tool2.tool_name = "tool2"
+        mock_tool2.tool_spec = {"description": "Tool 2"}
+
+        _, mock_instance = mock_mcp_client
+        mock_instance.list_tools_sync.return_value = [mock_tool1, mock_tool2]
+
+        mock_agent = MagicMock()
+        mock_registry = MagicMock()
+        mock_registry.register_tool.side_effect = [None, Exception("Registration failed")]
+        mock_agent.tool_registry = mock_registry
+
+        # Connect
+        dynamic_mcp_client(
+            action="connect", connection_id="test_server", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Load tools
+        result = dynamic_mcp_client(action="load_tools", connection_id="test_server", agent=mock_agent)
+
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        load_data = result["content"][1]["json"]
+        assert "Loaded 1 tools" in load_data["message"]
+        assert load_data["loaded_tools"] == ["tool1"]
+        assert "skipped_tools" in load_data
+        assert load_data["skipped_tools"][0]["name"] == "tool2"
+        assert "Registration failed" in load_data["skipped_tools"][0]["error"]
+
+
+class TestMCPToolClass:
+    """Test the MCPTool wrapper class."""
+
+    @pytest.fixture
+    def mock_mcp_tool(self):
+        """Create a mock MCP tool."""
+        mock_tool = MagicMock()
+        mock_tool.tool_name = "test_tool"
+        mock_tool.tool_spec = {
+            "name": "test_tool",
+            "description": "A test tool",
+            "inputSchema": {"json": {"type": "object", "properties": {"param": {"type": "string"}}}},
+        }
+        return mock_tool
+
+    @pytest.fixture
+    def mock_connection_config(self, mock_mcp_client):
+        """Create a mock connection config."""
+        mock_client_class, mock_instance = mock_mcp_client
+        connection_info = ConnectionInfo(
+            connection_id="test_connection",
+            mcp_client=mock_instance,
+            transport="stdio",
+            url="python server.py",
+            register_time=0,
+            is_active=True,
+        )
+        _connections["test_connection"] = connection_info
+        return connection_info
+
+    def test_mcp_tool_initialization(self, mock_mcp_tool):
+        """Test MCPTool initialization."""
+        mcp_tool = MCPTool(mock_mcp_tool, "test_connection")
+
+        assert mcp_tool.tool_name == "test_tool"
+        assert mcp_tool.tool_spec == mock_mcp_tool.tool_spec
+        assert mcp_tool.tool_type == "mcp_dynamic"
+        assert mcp_tool._connection_id == "test_connection"
+        assert mcp_tool._mcp_tool == mock_mcp_tool
+
+    def test_mcp_tool_display_properties(self, mock_mcp_tool):
+        """Test MCPTool display properties."""
+        mcp_tool = MCPTool(mock_mcp_tool, "test_connection")
+        props = mcp_tool.get_display_properties()
+
+        assert props["Name"] == "test_tool"
+        assert props["Type"] == "mcp_dynamic"
+        assert props["Connection ID"] == "test_connection"
+
+    @pytest.mark.asyncio
+    async def test_mcp_tool_stream_success(self, mock_mcp_tool, mock_connection_config):
+        """Test successful MCPTool stream execution."""
+        mcp_tool = MCPTool(mock_mcp_tool, "test_connection")
+
+        # Mock the async call_tool_async method
+        mock_result = {
+            "toolUseId": "test-tool-use-id",
+            "status": "success",
+            "content": [{"text": "Tool executed successfully"}],
+        }
+        mock_connection_config.mcp_client.call_tool_async = AsyncMock(return_value=mock_result)
+
+        tool_use = {"toolUseId": "test-tool-use-id", "name": "test_tool", "input": {"param": "test_value"}}
+
+        # Execute the stream method
+        results = []
+        async for result in mcp_tool.stream(tool_use, {}):
+            results.append(result)
+
+        assert len(results) == 1
+        assert results[0] == mock_result
+
+        # Verify the call_tool_async was called correctly
+        mock_connection_config.mcp_client.call_tool_async.assert_called_once_with(
+            tool_use_id="test-tool-use-id", name="test_tool", arguments={"param": "test_value"}
+        )
+
+    @pytest.mark.asyncio
+    async def test_mcp_tool_stream_connection_not_found(self, mock_mcp_tool):
+        """Test MCPTool stream when connection is not found."""
+        mcp_tool = MCPTool(mock_mcp_tool, "nonexistent_connection")
+
+        tool_use = {"toolUseId": "test-tool-use-id", "name": "test_tool", "input": {"param": "test_value"}}
+
+        # Execute the stream method
+        results = []
+        async for result in mcp_tool.stream(tool_use, {}):
+            results.append(result)
+
+        assert len(results) == 1
+        assert results[0]["toolUseId"] == "test-tool-use-id"
+        assert results[0]["status"] == "error"
+        assert "Connection 'nonexistent_connection' not found" in results[0]["content"][0]["text"]
+
+    @pytest.mark.asyncio
+    async def test_mcp_tool_stream_connection_inactive(self, mock_mcp_tool, mock_connection_config):
+        """Test MCPTool stream when connection is inactive."""
+        # Make connection inactive
+        mock_connection_config.is_active = False
+
+        mcp_tool = MCPTool(mock_mcp_tool, "test_connection")
+
+        tool_use = {"toolUseId": "test-tool-use-id", "name": "test_tool", "input": {"param": "test_value"}}
+
+        # Execute the stream method
+        results = []
+        async for result in mcp_tool.stream(tool_use, {}):
+            results.append(result)
+
+        assert len(results) == 1
+        assert results[0]["toolUseId"] == "test-tool-use-id"
+        assert results[0]["status"] == "error"
+        assert "Connection 'test_connection' is not active" in results[0]["content"][0]["text"]
+
+    @pytest.mark.asyncio
+    async def test_mcp_tool_stream_execution_error(self, mock_mcp_tool, mock_connection_config):
+        """Test MCPTool stream when tool execution fails."""
+        mcp_tool = MCPTool(mock_mcp_tool, "test_connection")
+
+        # Mock the async call_tool_async method to raise an exception
+        mock_connection_config.mcp_client.call_tool_async = AsyncMock(side_effect=Exception("Tool execution failed"))
+
+        tool_use = {"toolUseId": "test-tool-use-id", "name": "test_tool", "input": {"param": "test_value"}}
+
+        # Execute the stream method
+        results = []
+        async for result in mcp_tool.stream(tool_use, {}):
+            results.append(result)
+
+        assert len(results) == 1
+        assert results[0]["toolUseId"] == "test-tool-use-id"
+        assert results[0]["status"] == "error"
+        assert "Failed to execute tool 'test_tool'" in results[0]["content"][0]["text"]
+        assert "Tool execution failed" in results[0]["content"][0]["text"]
+
+        # Verify connection was marked as inactive
+        assert not mock_connection_config.is_active
+        assert mock_connection_config.last_error == "Tool execution failed"
+
+    @pytest.mark.asyncio
+    async def test_mcp_tool_stream_uses_context_manager(self, mock_mcp_tool, mock_connection_config):
+        """Test that MCPTool stream uses the context manager pattern."""
+        mcp_tool = MCPTool(mock_mcp_tool, "test_connection")
+
+        # Mock the async call_tool_async method
+        mock_result = {"toolUseId": "test-tool-use-id", "status": "success", "content": [{"text": "Success"}]}
+        mock_connection_config.mcp_client.call_tool_async = AsyncMock(return_value=mock_result)
+
+        tool_use = {"toolUseId": "test-tool-use-id", "name": "test_tool", "input": {"param": "test_value"}}
+
+        # Execute the stream method
+        results = []
+        async for result in mcp_tool.stream(tool_use, {}):
+            results.append(result)
+
+        # Verify that the context manager was used
+        mock_connection_config.mcp_client.__enter__.assert_called_once()
+        mock_connection_config.mcp_client.__exit__.assert_called_once()
+
+
+class TestMCPClientInvalidAction:
+    """Test handling of invalid actions."""
+
+    def test_invalid_action(self):
+        """Test calling with an invalid action."""
+        result = dynamic_mcp_client(action="invalid_action", connection_id="test")
+
+        assert result["status"] == "error"
+        assert "Unknown action: invalid_action" in result["content"][0]["text"]
+        assert "Available actions:" in result["content"][0]["text"]
+
+
+class TestBinaryDataHandling:
+    """Test binary data handling through direct tool calls."""
+
+    def test_call_server_tool_with_binary_data(self, mock_mcp_client, mock_stdio_client):
+        """Test calling a tool that returns binary data directly."""
+        # Mock the MCP client to return binary data
+        _, mock_instance = mock_mcp_client
+        mock_result = {
+            "status": "success",
+            "toolUseId": "test-tool-use-id",
+            "content": [
+                {"type": "text", "text": "Generated image"},
+                {"type": "image", "data": "[BASE64_DATA]iVBORw0KGgoAAAANSUhE..."},
+                {"type": "text", "text": "Raw bytes"},
+            ],
+        }
+        mock_instance.call_tool_sync.return_value = mock_result
+
+        # Connect first
+        dynamic_mcp_client(
+            action="connect", connection_id="binary_tool_test", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Call the tool that returns binary data
+        result = dynamic_mcp_client(
+            action="call_tool",
+            connection_id="binary_tool_test",
+            tool_name="generate_image",
+            tool_args={"width": 100, "height": 100},
+        )
+
+        # Verify the result - it should return the mock result directly
+        assert result["status"] == "success"
+        assert result["toolUseId"] == "test-tool-use-id"
+        assert len(result["content"]) == 3
+
+
+class TestMCPClientCleanup:
+    """Test cleanup functionality for MCP client tools."""
+
+    def test_disconnect_cleans_up_tools(self, mock_mcp_client, mock_stdio_client):
+        """Test that disconnecting cleans up loaded tools."""
+        # Create a mock agent
+        mock_agent = MagicMock()
+        mock_registry = MagicMock()
+        mock_registry.unregister_tool = MagicMock()
+        mock_agent.tool_registry = mock_registry
+
+        # Connect first
+        dynamic_mcp_client(
+            action="connect", connection_id="test_connection", transport="stdio", command="python", args=["server.py"]
+        )
+
+        # Manually set some loaded tools on the connection
+        if "test_connection" in _connections:
+            _connections["test_connection"].loaded_tool_names = ["test_tool", "another_tool"]
+
+        # Call disconnect with the agent
+        result = dynamic_mcp_client(action="disconnect", connection_id="test_connection", agent=mock_agent)
+
+        # Check that unregister_tool was called for each loaded tool
+        assert mock_agent.tool_registry.unregister_tool.call_count == 2
+        mock_agent.tool_registry.unregister_tool.assert_any_call("test_tool")
+        mock_agent.tool_registry.unregister_tool.assert_any_call("another_tool")
+
+        # Check the result
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        disconnect_data = result["content"][1]["json"]
+        assert "cleaned_tools" in disconnect_data
+        assert len(disconnect_data["cleaned_tools"]) == 2
+        assert "test_connection" not in _connections
+
+    def test_call_tool_cleans_up_on_connection_failure(self, mock_mcp_client):
+        """Test that call_tool cleans up tools when connection fails during call."""
+        # Create a mock agent
+        mock_agent = MagicMock()
+        mock_registry = MagicMock()
+        mock_registry.unregister_tool = MagicMock()
+        mock_agent.tool_registry = mock_registry
+
+        # Setup mock client that fails on tool call
+        _, mock_instance = mock_mcp_client
+        mock_instance.call_tool_sync.side_effect = Exception("Connection failed during call")
+
+        # Connect first
+        dynamic_mcp_client(
+            action="connect",
+            connection_id="failing_call_connection",
+            transport="stdio",
+            command="python",
+            args=["server.py"],
+        )
+
+        # Manually set some loaded tools
+        if "failing_call_connection" in _connections:
+            _connections["failing_call_connection"].loaded_tool_names = ["test_tool"]
+
+        # Call the tool (should fail and trigger cleanup)
+        result = dynamic_mcp_client(
+            action="call_tool", connection_id="failing_call_connection", tool_name="test_tool", agent=mock_agent
+        )
+
+        # Check the result indicates failure
+        assert result["status"] == "error"
+        assert "Failed to call tool" in result["content"][0]["text"]
+
+
+class TestMCPClientIntegration:
+    """Integration tests for full workflows."""
+
+    def test_full_workflow(self, mock_mcp_client, mock_stdio_client):
+        """Test a complete workflow: connect, list tools, call tool, disconnect."""
+        # 1. Connect
+        connect_result = dynamic_mcp_client(
+            action="connect", connection_id="workflow_server", transport="stdio", command="python", args=["server.py"]
+        )
+        assert connect_result["status"] == "success"
+
+        # 2. List connections
+        list_result = dynamic_mcp_client(action="list_connections")
+        assert list_result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connections_data = list_result["content"][1]["json"]
+        assert connections_data["total_connections"] == 1
+
+        # 3. List tools
+        tools_result = dynamic_mcp_client(action="list_tools", connection_id="workflow_server")
+        assert tools_result["status"] == "success"
+        # Access structured data from new ToolResult format
+        tools_data = tools_result["content"][1]["json"]
+        assert tools_data["tools_count"] == 1
+
+        # 4. Call a tool
+        call_result = dynamic_mcp_client(
+            action="call_tool", connection_id="workflow_server", tool_name="test_tool", tool_args={"param": "test"}
+        )
+        assert call_result["status"] == "success"
+
+        # 5. Disconnect
+        disconnect_result = dynamic_mcp_client(action="disconnect", connection_id="workflow_server")
+        assert disconnect_result["status"] == "success"
+
+        # 6. Verify connection is gone
+        final_list = dynamic_mcp_client(action="list_connections")
+        final_connections_data = final_list["content"][1]["json"]
+        assert final_connections_data["total_connections"] == 0
+
+
+# The MCPToolWrapper class functionality is now tested through the MCPTool class tests above
+
+
+# Configuration tests are handled within other test classes
+
+# Result handling is tested through MCPTool class tests above
diff --git a/tests_integ/mcp_client/__init__.py b/tests_integ/mcp_client/__init__.py
new file mode 100644
index 00000000..7025e40d
--- /dev/null
+++ b/tests_integ/mcp_client/__init__.py
@@ -0,0 +1 @@
+"""Integration tests for MCP client tools."""
\ No newline at end of file
diff --git a/tests_integ/mcp_client/mock_mcp_server.py b/tests_integ/mcp_client/mock_mcp_server.py
new file mode 100644
index 00000000..c6edc399
--- /dev/null
+++ b/tests_integ/mcp_client/mock_mcp_server.py
@@ -0,0 +1,132 @@
+"""Mock MCP Server for integration testing using FastMCP.
+
+This server provides a simple echo tool for testing the dynamic MCP client.
+It supports both stdio and SSE transports.
+"""
+
+import asyncio
+import logging
+import os
+import sys
+import threading
+import time
+from typing import Any, Dict
+
+from mcp.server import FastMCP
+from mcp.types import TextContent
+
+# Configure logging
+logging.basicConfig(level=logging.INFO, stream=sys.stderr)
+logger = logging.getLogger(__name__)
+
+# Create FastMCP server
+app = FastMCP("MockMCPServer")
+
+
+@app.tool()
+def echo_tool(message: str) -> str:
+    """Echo the provided message back to the caller.
+    
+    Args:
+        message: The message to echo back
+        
+    Returns:
+        The echoed message with a prefix
+    """
+    return f"Echo: {message}"
+
+
+@app.tool()
+def add_numbers(a: int, b: int) -> int:
+    """Add two numbers together.
+    
+    Args:
+        a: First number
+        b: Second number
+        
+    Returns:
+        The sum of a and b
+    """
+    return a + b
+
+
+class MockMCPServer:
+    """Mock MCP Server that can run in stdio or SSE mode."""
+    
+    def __init__(self):
+        self.server_task = None
+        self.server_thread = None
+        
+    async def run_stdio_async(self):
+        """Run the server in stdio mode."""
+        logger.info("Starting MockMCP server in stdio mode")
+        await app.run_stdio_async()
+        
+    async def run_sse_async(self, host: str = "localhost", port: int = 8000):
+        """Run the server in SSE mode using uvicorn."""
+        logger.info(f"Starting MockMCP server in SSE mode on {host}:{port}")
+        
+        # Get the SSE app from FastMCP
+        sse_app = app.sse_app()
+        
+        # Use uvicorn to run the SSE app
+        import uvicorn
+        config = uvicorn.Config(
+            app=sse_app,
+            host=host,
+            port=port,
+            log_level="info"
+        )
+        server = uvicorn.Server(config)
+        await server.serve()
+        
+    def run_stdio_sync(self):
+        """Run the server in stdio mode synchronously."""
+        try:
+            asyncio.run(self.run_stdio_async())
+        except KeyboardInterrupt:
+            logger.info("MockMCP stdio server stopped")
+        except Exception as e:
+            logger.error(f"MockMCP stdio server error: {e}")
+            
+    def run_sse_sync(self, host: str = "localhost", port: int = 8000):
+        """Run the server in SSE mode synchronously."""
+        try:
+            asyncio.run(self.run_sse_async(host=host, port=port))
+        except KeyboardInterrupt:
+            logger.info("MockMCP SSE server stopped")
+        except Exception as e:
+            logger.error(f"MockMCP SSE server error: {e}")
+            
+    def start_stdio_in_thread(self):
+        """Start the stdio server in a separate thread."""
+        self.server_thread = threading.Thread(target=self.run_stdio_sync, daemon=True)
+        self.server_thread.start()
+        return self.server_thread
+        
+    def start_sse_in_thread(self, host: str = "localhost", port: int = 8000):
+        """Start the SSE server in a separate thread."""
+        self.server_thread = threading.Thread(
+            target=self.run_sse_sync, 
+            args=(host, port), 
+            daemon=True
+        )
+        self.server_thread.start()
+        return self.server_thread
+
+
+def main():
+    """Main entry point for running the server."""
+    if len(sys.argv) > 1 and sys.argv[1] == "sse":
+        # SSE mode with optional port
+        port = int(sys.argv[2]) if len(sys.argv) > 2 else 8000
+        server = MockMCPServer()
+        server.run_sse_sync(port=port)
+    else:
+        # Default to stdio mode
+        server = MockMCPServer()
+        server.run_stdio_sync()
+
+
+if __name__ == "__main__":
+    main()
\ No newline at end of file
diff --git a/tests_integ/mcp_client/test_dynamic_mcp_client.py b/tests_integ/mcp_client/test_dynamic_mcp_client.py
new file mode 100644
index 00000000..bc4672df
--- /dev/null
+++ b/tests_integ/mcp_client/test_dynamic_mcp_client.py
@@ -0,0 +1,502 @@
+"""
+Integration tests for the Dynamic MCP Client tool.
+
+These tests use a MockMCP server (FastMCP) to validate end-to-end functionality 
+of the dynamic_mcp_client tool with Strands agents.
+"""
+
+import os
+from unittest.mock import patch
+
+import pytest
+from strands import Agent
+from strands_tools import dynamic_mcp_client
+
+from .test_helpers import parse_tool_result, stdio_mcp_server, sse_mcp_server
+
+
+@pytest.fixture(autouse=True)
+def bypass_tool_consent():
+    """Bypass tool consent for integration tests."""
+    with patch.dict(os.environ, {"BYPASS_TOOL_CONSENT": "true"}):
+        yield
+
+
+@pytest.fixture
+def agent():
+    """Create an Agent instance configured with dynamic MCP client tool."""
+    return Agent(tools=[dynamic_mcp_client])
+
+
+class TestMCPClientBasicFunctionality:
+    """Test basic MCP client functionality that doesn't require real servers."""
+
+    def test_list_connections_empty(self, agent):
+        """Test listing connections when none exist."""
+        result = agent.tool.dynamic_mcp_client(action="list_connections")
+        
+        assert result["status"] == "success"
+        # Text message
+        assert "Found 0 MCP connections" in result["content"][0]["text"]
+        
+        # Structured data
+        connections_data = result["content"][1]["json"]
+        assert connections_data["total_connections"] == 0
+        assert connections_data["connections"] == []
+
+    def test_invalid_action(self, agent):
+        """Test calling with an invalid action."""
+        result = agent.tool.dynamic_mcp_client(action="invalid_action", connection_id="test")
+
+        assert result["status"] == "error"
+        assert "Unknown action: invalid_action" in result["content"][0]["text"]
+        assert "Available actions:" in result["content"][0]["text"]
+
+
+class TestStdioMCPServerIntegration:
+    """Test integration with stdio-based MCP servers using MockMCP."""
+    
+    def test_stdio_server_basic_connection(self, agent, stdio_mcp_server):
+        """Test basic connection to a stdio MCP server."""
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="stdio_test_server",
+            transport="stdio", 
+            command="python",
+            args=[stdio_mcp_server]
+        )
+        
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert connection_data["connection_id"] == "stdio_test_server"
+        assert connection_data["transport"] == "stdio"
+        assert connection_data["tools_count"] > 0
+        assert "echo_tool" in connection_data["available_tools"]
+        
+        # Cleanup
+        agent.tool.dynamic_mcp_client(action="disconnect", connection_id="stdio_test_server")
+    
+    def test_stdio_server_list_tools(self, agent, stdio_mcp_server):
+        """Test listing tools from a stdio MCP server."""
+        # Connect first
+        agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="stdio_list_test",
+            transport="stdio",
+            command="python", 
+            args=[stdio_mcp_server]
+        )
+        
+        # List tools
+        result = agent.tool.dynamic_mcp_client(
+            action="list_tools",
+            connection_id="stdio_list_test"
+        )
+        
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        tools_data = result["content"][1]["json"]
+        assert tools_data["tools_count"] >= 2  # echo_tool and add_numbers
+        assert len(tools_data["tools"]) >= 2
+        
+        # Check that we have our expected tools
+        tool_names = [tool["name"] for tool in tools_data["tools"]]
+        assert "echo_tool" in tool_names
+        assert "add_numbers" in tool_names
+        
+        # Check tool structure
+        echo_tool = next(tool for tool in tools_data["tools"] if tool["name"] == "echo_tool")
+        assert "description" in echo_tool
+        assert "input_schema" in echo_tool
+        # FastMCP tools have a nested json structure in their input schema
+        assert "json" in echo_tool["input_schema"]
+        assert echo_tool["input_schema"]["json"]["type"] == "object"
+        
+        # Cleanup
+        agent.tool.dynamic_mcp_client(action="disconnect", connection_id="stdio_list_test")
+    
+    def test_stdio_server_call_tool(self, agent, stdio_mcp_server):
+        """Test calling a tool on a stdio MCP server."""
+        # Connect first
+        agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="stdio_call_test",
+            transport="stdio",
+            command="python",
+            args=[stdio_mcp_server]
+        )
+        
+        # Call echo_tool
+        result = agent.tool.dynamic_mcp_client(
+            action="call_tool",
+            connection_id="stdio_call_test",
+            tool_name="echo_tool",
+            tool_args={"message": "integration test"}
+        )
+        
+        assert result["status"] == "success"
+        # With the simplified implementation, result is now returned directly from SDK
+        assert result["toolUseId"] is not None
+        
+        # Check that the echo worked
+        content = result["content"]
+        assert len(content) > 0
+        assert "Echo: integration test" in content[0]["text"]
+        
+        # Test add_numbers tool
+        result = agent.tool.dynamic_mcp_client(
+            action="call_tool",
+            connection_id="stdio_call_test",
+            tool_name="add_numbers",
+            tool_args={"a": 5, "b": 3}
+        )
+        
+        assert result["status"] == "success"
+        # With the simplified implementation, result is now returned directly from SDK
+        assert result["toolUseId"] is not None
+        
+        # Cleanup
+        agent.tool.dynamic_mcp_client(action="disconnect", connection_id="stdio_call_test")
+    
+    def test_stdio_server_with_environment_variables(self, agent, stdio_mcp_server):
+        """Test stdio server with custom environment variables."""
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="stdio_env_test",
+            transport="stdio",
+            command="python",
+            args=[stdio_mcp_server],
+            env={"TEST_VAR": "integration_test_value", "DEBUG": "true"}
+        )
+        
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert connection_data["connection_id"] == "stdio_env_test"
+        
+        # Cleanup
+        agent.tool.dynamic_mcp_client(action="disconnect", connection_id="stdio_env_test")
+
+
+class TestSSEMCPServerIntegration:
+    """Test integration with SSE-based MCP servers using MockMCP."""
+    
+    def test_sse_server_basic_connection(self, agent, sse_mcp_server):
+        """Test basic connection to an SSE MCP server."""
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="sse_test_server",
+            transport="sse",
+            server_url=sse_mcp_server
+        )
+        
+        assert result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connection_data = result["content"][1]["json"]
+        assert connection_data["connection_id"] == "sse_test_server"
+        assert connection_data["transport"] == "sse"
+        assert connection_data["tools_count"] > 0
+        assert "echo_tool" in connection_data["available_tools"]
+        
+        # Cleanup
+        agent.tool.dynamic_mcp_client(action="disconnect", connection_id="sse_test_server")
+    
+    def test_sse_server_reliability(self, agent, sse_mcp_server):
+        """Test SSE connection stability and basic operations."""
+        # Connect
+        connect_result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="sse_reliability_test",
+            transport="sse",
+            server_url=sse_mcp_server
+        )
+        assert connect_result["status"] == "success"
+        
+        # Multiple operations to test stability
+        for i in range(3):
+            list_result = agent.tool.dynamic_mcp_client(
+                action="list_tools",
+                connection_id="sse_reliability_test"
+            )
+            assert list_result["status"] == "success"
+            
+            call_result = agent.tool.dynamic_mcp_client(
+                action="call_tool",
+                connection_id="sse_reliability_test", 
+                tool_name="echo_tool",
+                tool_args={"message": f"test_{i}"}
+            )
+            assert call_result["status"] == "success"
+        
+        # Cleanup
+        agent.tool.dynamic_mcp_client(action="disconnect", connection_id="sse_reliability_test")
+
+
+class TestEndToEndWorkflows:
+    """Test complete end-to-end workflows with real MCP servers."""
+    
+    def test_full_workflow_stdio_server(self, agent, stdio_mcp_server):
+        """Test complete workflow: connect → list tools → call tool → load tools → disconnect."""
+        connection_id = "full_workflow_test"
+        
+        # 1. Connect
+        connect_result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id=connection_id,
+            transport="stdio",
+            command="python",
+            args=[stdio_mcp_server]
+        )
+        assert connect_result["status"] == "success"
+        
+        # 2. List connections
+        list_conn_result = agent.tool.dynamic_mcp_client(action="list_connections")
+        assert list_conn_result["status"] == "success"
+        # Access structured data from new ToolResult format
+        connections_data = list_conn_result["content"][1]["json"]
+        assert connections_data["total_connections"] >= 1
+        
+        # Find our connection
+        our_conn = next(
+            (c for c in connections_data["connections"] if c["connection_id"] == connection_id),
+            None
+        )
+        assert our_conn is not None
+        assert our_conn["is_active"] is True
+        
+        # 3. List tools
+        list_tools_result = agent.tool.dynamic_mcp_client(
+            action="list_tools",
+            connection_id=connection_id
+        )
+        assert list_tools_result["status"] == "success"
+        # Access structured data from new ToolResult format
+        tools_data = list_tools_result["content"][1]["json"]
+        assert tools_data["tools_count"] >= 2
+        
+        # 4. Call a tool
+        call_result = agent.tool.dynamic_mcp_client(
+            action="call_tool",
+            connection_id=connection_id,
+            tool_name="echo_tool",
+            tool_args={"message": "full workflow test"}
+        )
+        assert call_result["status"] == "success"
+        
+        # 5. Load tools into agent
+        load_result = agent.tool.dynamic_mcp_client(
+            action="load_tools",
+            connection_id=connection_id,
+            agent=agent
+        )
+        assert load_result["status"] == "success"
+        # Access structured data from new ToolResult format
+        load_data = load_result["content"][1]["json"]
+        assert load_data["tool_count"] >= 2
+        assert "echo_tool" in load_data["loaded_tools"]
+        
+        # 6. Disconnect
+        disconnect_result = agent.tool.dynamic_mcp_client(
+            action="disconnect",
+            connection_id=connection_id,
+            agent=agent  # Pass agent for tool cleanup
+        )
+        assert disconnect_result["status"] == "success"
+        
+        # 7. Verify connection is gone
+        final_list = agent.tool.dynamic_mcp_client(action="list_connections")
+        final_connections_data = final_list["content"][1]["json"]
+        remaining_connections = [
+            c for c in final_connections_data["connections"] 
+            if c["connection_id"] == connection_id
+        ]
+        assert len(remaining_connections) == 0
+    
+    def test_tool_loading_and_execution_through_agent(self, agent, stdio_mcp_server):
+        """Test loading MCP tools into agent and executing them via agent calls."""
+        connection_id = "tool_loading_test"
+        
+        # Connect and load tools
+        agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id=connection_id,
+            transport="stdio",
+            command="python",
+            args=[stdio_mcp_server]
+        )
+        
+        load_result = agent.tool.dynamic_mcp_client(
+            action="load_tools",
+            connection_id=connection_id,
+            agent=agent
+        )
+        assert load_result["status"] == "success"
+        # Access structured data from new ToolResult format
+        load_data = load_result["content"][1]["json"]
+        assert load_data["tool_count"] >= 2
+        
+        # The tools should now be in the agent's registry
+        tool_names = load_data["loaded_tools"]
+        assert len(tool_names) >= 2
+        assert "echo_tool" in tool_names
+        assert "add_numbers" in tool_names
+        
+        # 🚀 NEW: Actually test calling the loaded tools through the agent interface
+        try:
+            # Test calling echo_tool directly through the agent
+            echo_result = agent.tool.echo_tool(message="test from agent interface")
+            assert echo_result["status"] == "success"
+            assert "Echo: test from agent interface" in echo_result["content"][0]["text"]
+            
+            # Test calling add_numbers directly through the agent
+            add_result = agent.tool.add_numbers(a=7, b=13)
+            
+            # The result should be in proper Strands format 
+            assert add_result["status"] == "success"
+            assert add_result["content"][0]["text"] == "20"  # 7 + 13 = 20
+            
+        except AttributeError as e:
+            # This would indicate the tools weren't properly loaded into the agent
+            assert False, f"Loaded tools not accessible through agent.tool interface: {e}"
+        
+        # Cleanup
+        agent.tool.dynamic_mcp_client(
+            action="disconnect",
+            connection_id=connection_id,
+            agent=agent
+        )
+
+
+class TestErrorHandlingAndReliability:
+    """Test error handling and reliability scenarios."""
+    
+    def test_connection_to_nonexistent_server(self, agent):
+        """Test connecting to a non-existent server."""
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="nonexistent_test",
+            transport="stdio",
+            command="nonexistent_command",
+            args=["fake_server.py"]
+        )
+        
+        assert result["status"] == "error"
+        assert "Connection failed" in result["content"][0]["text"]
+    
+    def test_invalid_server_url_http(self, agent):
+        """Test connecting to invalid HTTP server URL."""
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="invalid_http_test",
+            transport="streamable_http",
+            server_url="http://localhost:99999/invalid"
+        )
+        
+        assert result["status"] == "error"
+        assert "Connection failed" in result["content"][0]["text"]
+    
+    def test_invalid_server_url_sse(self, agent):
+        """Test connecting to invalid SSE server URL."""
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="invalid_sse_test", 
+            transport="sse",
+            server_url="http://localhost:99999/invalid"
+        )
+        
+        assert result["status"] == "error"
+        assert "Connection failed" in result["content"][0]["text"]
+    
+    def test_missing_required_parameters(self, agent):
+        """Test error handling for missing required parameters."""
+        # Missing connection_id for connect
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            transport="stdio",
+            command="python"
+        )
+        assert result["status"] == "error"
+        assert "connection_id is required" in result["content"][0]["text"]
+        
+        # Missing server_url for HTTP
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="test",
+            transport="streamable_http"
+        )
+        assert result["status"] == "error"
+        assert "server_url is required" in result["content"][0]["text"]
+        
+        # Missing tool_name for call_tool
+        result = agent.tool.dynamic_mcp_client(
+            action="call_tool",
+            connection_id="test"
+        )
+        assert result["status"] == "error"
+        assert "tool_name is required" in result["content"][0]["text"]
+
+    def test_operations_on_nonexistent_connections(self, agent):
+        """Test operations on connections that don't exist."""
+        # Try to list tools from non-existent connection
+        result = agent.tool.dynamic_mcp_client(
+            action="list_tools",
+            connection_id="nonexistent_connection"
+        )
+        assert result["status"] == "error"
+        assert "Connection 'nonexistent_connection' not found" in result["content"][0]["text"]
+        
+        # Try to call tool on non-existent connection
+        result = agent.tool.dynamic_mcp_client(
+            action="call_tool",
+            connection_id="nonexistent_connection",
+            tool_name="test_tool"
+        )
+        assert result["status"] == "error"
+        assert "Connection 'nonexistent_connection' not found" in result["content"][0]["text"]
+        
+        # Try to disconnect from non-existent connection
+        result = agent.tool.dynamic_mcp_client(
+            action="disconnect",
+            connection_id="nonexistent_connection"
+        )
+        assert result["status"] == "error"
+        assert "Connection 'nonexistent_connection' not found" in result["content"][0]["text"]
+
+    def test_load_tools_without_agent(self, agent):
+        """Test load_tools action without providing agent instance."""
+        result = agent.tool.dynamic_mcp_client(
+            action="load_tools",
+            connection_id="test_connection"
+            # Note: agent parameter is not provided
+        )
+        assert result["status"] == "error"
+        assert "agent instance is required" in result["content"][0]["text"]
+
+
+class TestConfigurationAndParameterHandling:
+    """Test configuration and parameter handling."""
+    
+    def test_transport_parameter_validation(self, agent):
+        """Test validation of transport parameters."""
+        # Test unsupported transport (should fail during connection)
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="test",
+            transport="unsupported_transport"
+        )
+        assert result["status"] == "error"
+        assert "Connection failed" in result["content"][0]["text"]
+    
+    def test_server_config_vs_direct_parameters(self, agent):
+        """Test that direct parameters override server_config."""
+        # This should fail because command is missing, but it tests parameter precedence
+        result = agent.tool.dynamic_mcp_client(
+            action="connect",
+            connection_id="test",
+            server_config={"transport": "sse", "server_url": "http://example.com"},
+            transport="stdio"  # This should override the server_config transport
+        )
+        assert result["status"] == "error"
+        # Should complain about missing command (stdio transport) not server_url (sse transport)
+        assert "command is required for stdio transport" in result["content"][0]["text"]
\ No newline at end of file
diff --git a/tests_integ/mcp_client/test_helpers.py b/tests_integ/mcp_client/test_helpers.py
new file mode 100644
index 00000000..3c7c7b2f
--- /dev/null
+++ b/tests_integ/mcp_client/test_helpers.py
@@ -0,0 +1,150 @@
+"""Test helpers for MCP client integration tests."""
+
+import ast
+import json
+import os
+import signal
+import socket
+import subprocess
+import sys
+import time
+from pathlib import Path
+from typing import Optional
+
+import pytest
+
+from .mock_mcp_server import MockMCPServer
+
+
+def find_free_port() -> int:
+    """Find and return an available port."""
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+        s.bind(("", 0))
+        return s.getsockname()[1]
+
+
+def wait_for_server(host: str, port: int, timeout: int = 10) -> bool:
+    """Wait for a server to become available."""
+    start_time = time.time()
+    while time.time() - start_time < timeout:
+        try:
+            with socket.create_connection((host, port), timeout=0.1):
+                return True
+        except (ConnectionRefusedError, socket.timeout, OSError):
+            time.sleep(0.1)
+    return False
+
+
+class StdioMCPServerManager:
+    """Manager for stdio-based MCP server processes."""
+    
+    def __init__(self):
+        self.process: Optional[subprocess.Popen] = None
+        self.script_path = Path(__file__).parent / "mock_mcp_server.py"
+        
+    def start(self) -> str:
+        """Start the stdio MCP server and return the script path."""
+        # The script path is what we need for stdio connections
+        return str(self.script_path.absolute())
+        
+    def stop(self):
+        """Stop the stdio MCP server."""
+        if self.process:
+            self.process.terminate()
+            self.process.wait()
+            self.process = None
+
+
+class SSEMCPServerManager:
+    """Manager for SSE-based MCP server processes."""
+    
+    def __init__(self):
+        self.process: Optional[subprocess.Popen] = None
+        self.port: Optional[int] = None
+        self.script_path = Path(__file__).parent / "mock_mcp_server.py"
+        
+    def start(self) -> str:
+        """Start the SSE MCP server and return the server URL."""
+        self.port = find_free_port()
+        
+        # Start the server process
+        self.process = subprocess.Popen([
+            sys.executable, 
+            str(self.script_path), 
+            "sse", 
+            str(self.port)
+        ], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        
+        # Wait for server to be ready
+        if not wait_for_server("localhost", self.port, timeout=10):
+            self.stop()
+            raise RuntimeError(f"SSE MCP server failed to start on port {self.port}")
+            
+        return f"http://localhost:{self.port}/sse"
+        
+    def stop(self):
+        """Stop the SSE MCP server."""
+        if self.process:
+            self.process.terminate()
+            try:
+                self.process.wait(timeout=5)
+            except subprocess.TimeoutExpired:
+                self.process.kill()
+                self.process.wait()
+            self.process = None
+
+
+@pytest.fixture(scope="session")
+def stdio_mcp_server():
+    """Fixture providing a stdio MCP server for testing."""
+    manager = StdioMCPServerManager()
+    server_script = manager.start()
+    yield server_script
+    manager.stop()
+
+
+@pytest.fixture(scope="session")
+def sse_mcp_server():
+    """Fixture providing an SSE MCP server for testing.""" 
+    manager = SSEMCPServerManager()
+    try:
+        server_url = manager.start()
+        yield server_url
+    finally:
+        manager.stop()
+
+
+def parse_tool_result(result):
+    """Parse the tool result from agent.tool calls that may return serialized data.
+    
+    Agent calls return: {'toolUseId': ..., 'status': 'success', 'content': [{'text': '...'}]}
+    Some tools may serialize complex data in the content[0]['text'] field.
+    
+    NOTE: This function is DEPRECATED for dynamic_mcp_client as of the ToolResult format fix.
+    The dynamic_mcp_client now returns proper ToolResult format directly:
+    - Text message in content[0]['text']
+    - Structured data in content[1]['json']
+    
+    This helper is kept for backward compatibility with other tools that may still
+    serialize their results, but should not be needed for properly implemented tools.
+    
+    Example:
+    - dynamic_mcp_client calls: Use results directly (no parsing needed)
+    - Other tools that serialize: May still need parse_tool_result()
+    - Loaded MCP tools: Use results directly (no parsing needed)
+    """
+    if result.get('status') != 'success':
+        return result
+    
+    try:
+        text = result['content'][0]['text']
+        # Try JSON parsing first
+        try:
+            actual_result = json.loads(text)
+            return actual_result
+        except json.JSONDecodeError:
+            # Try evaluating as Python literal (safe eval for dict/list/etc)
+            actual_result = ast.literal_eval(text)
+            return actual_result
+    except (KeyError, IndexError, ValueError, SyntaxError):
+        return result
\ No newline at end of file