The Model Context Protocol (MCP) implementation for the GeneralizedNotationNotation (GNN) project provides a server with features for tool discovery, registration, execution, and monitoring. This implementation extends the standard MCP specification with error handling, performance monitoring, caching, rate limiting, and thread safety.
- JSON-RPC 2.0 Compliance: Compliance with the MCP specification
- Dynamic Module Discovery: Automatic discovery and loading of MCP modules
- Tool and Resource Registration: Tool and resource management
- Multiple Transport Layers: Support for stdio and HTTP transport
- CLI Interface: Command-line access to MCP functionality
- Advanced Error Handling: Detailed error reporting with context information
- Performance Monitoring: Comprehensive metrics and health monitoring
- Caching System: Intelligent result caching with TTL support
- Rate Limiting: Configurable rate limiting per tool
- Concurrent Execution Control: Limits on concurrent tool executions
- Thread Safety: Full thread-safe operations with proper locking
- Enhanced Validation: Comprehensive parameter validation with detailed error messages
- Health Monitoring: Real-time health status and diagnostics
- Memory Management: Efficient memory usage and cleanup
src/mcp/
├── mcp.py # Enhanced core MCP server implementation
├── server_stdio.py # stdio transport server
├── server_http.py # HTTP transport server
├── cli.py # Command-line interface
├── meta_mcp.py # Meta-tools for server introspection
├── sympy_mcp.py # SymPy integration tools
├── sympy_mcp_client.py # SymPy MCP client
├── npx_inspector.py # NPX inspector utilities
├── README.md # Basic documentation
└── *.md # Additional documentation files
Enhanced tool representation with additional metadata:
@dataclass
class MCPTool:
name: str # Tool name
func: Callable # Tool function
schema: Dict[str, Any] # JSON schema for parameters
description: str # Tool description
module: str = "" # Source module
category: str = "" # Tool category
version: str = "1.0.0" # Tool version
tags: List[str] = field(default_factory=list) # Tags for categorization
examples: List[Dict[str, Any]] = field(default_factory=list) # Usage examples
deprecated: bool = False # Deprecation flag
experimental: bool = False # Experimental flag
timeout: Optional[float] = None # Execution timeout
max_concurrent: int = 1 # Max concurrent executions
requires_auth: bool = False # Authentication requirement
rate_limit: Optional[float] = None # Rate limit (requests/second)
cache_ttl: Optional[float] = None # Cache TTL
input_validation: bool = True # Enable input validation
output_validation: bool = True # Enable output validationEnhanced resource representation:
@dataclass
class MCPResource:
uri_template: str # URI template
retriever: Callable # Resource retriever function
description: str # Resource description
module: str = "" # Source module
category: str = "" # Resource category
version: str = "1.0.0" # Resource version
mime_type: str = "application/json" # MIME type
cacheable: bool = True # Cacheable flag
tags: List[str] = field(default_factory=list) # Tags
timeout: Optional[float] = None # Retrieval timeout
requires_auth: bool = False # Authentication requirement
rate_limit: Optional[float] = None # Rate limit
cache_ttl: Optional[float] = None # Cache TTL
compression: bool = False # Compression flag
encryption: bool = False # Encryption flagErrors carry both the wire-level JSON-RPC code and local context (tool name, module name, optional payload) so server logs can be correlated with client stack traces:
class MCPError(Exception):
"""Base class for MCP related errors."""
def __init__(self, message: str, code: int = -32000, data: Optional[Any] = None,
tool_name: Optional[str] = None, module_name: Optional[str] = None):
super().__init__(message)
self.code = code
self.data = data or {}
self.tool_name = tool_name
self.module_name = module_name
self.timestamp = time.time()
class MCPToolNotFoundError(MCPError):
"""Raised when a requested tool is not found."""
class MCPResourceNotFoundError(MCPError):
"""Raised when a requested resource is not found."""
class MCPInvalidParamsError(MCPError):
"""Raised when tool parameters are invalid."""
class MCPToolExecutionError(MCPError):
"""Raised when tool execution fails."""
class MCPSDKNotFoundError(MCPError):
"""Raised when required SDK is not found."""
class MCPValidationError(MCPError):
"""Raised when validation fails."""
class MCPModuleLoadError(MCPError):
"""Raised when a module fails to load."""
class MCPPerformanceError(MCPError):
"""Raised when performance thresholds are exceeded."""All runtime knobs live on the singleton :class:mcp.MCP instance and are
settable through initialize(...) or the pipeline step process_mcp(**kwargs).
The table below is the authoritative list — any additional configuration must
be added to :func:mcp.mcp.initialize.
| Knob | Type | Default | Effect |
|---|---|---|---|
performance_mode |
"low" | "medium" | "high" |
"low" |
Bulk toggle — sets the four booleans below to sensible preset values |
enable_caching |
bool |
mode-derived | Toggle result-cache for tool invocations |
enable_rate_limiting |
bool |
mode-derived | Enforce per-tool RPS limits |
strict_validation |
bool |
mode-derived | Validate every call against its JSON schema |
cache_ttl |
float (s) |
300.0 |
Result-cache TTL |
modules_allowlist |
list[str] | None |
None |
Restrict discovery to the named modules under src/ |
per_module_timeout |
float (s) |
30.0 |
Max wall-clock per module during discovery |
overall_timeout |
float (s) |
120.0 |
Max wall-clock for parallel discovery |
force_refresh |
bool |
False |
Re-run discovery even if modules are already loaded |
python src/21_mcp.py \
--performance-mode high \
--mcp-strict-validation \
--mcp-cache-ttl 120 \
--mcp-modules-allowlist gnn,execute,render \
--mcp-per-module-timeout 15 \
--mcp-overall-timeout 60The MCP auto-discovers every module under src/ that exposes mcp.py. A live
run currently registers 133 tools and 1 resource across 31 modules.
Use the audit script for an up-to-date, ground-truth list:
PYTHONPATH=src uv run python src/mcp/validate_tools.pyPer-module tool counts at last inventory (2026-04-16):
| Module | Tools | Module | Tools |
|---|---|---|---|
| advanced_visualization | 4 | api | 5 |
| analysis | 4 | audio | 5 |
| execute | 5 | export | 4 |
| gnn (+ sympy + meta) | 47 | gui | 3 |
| integration | 4 | intelligent_analysis | 3 |
| llm | 5 | ml_integration | 4 |
| ontology | 4 | render | 4 |
| report | 5 | research | 4 |
| sapf | 4 | security | 4 |
| validation | 4 | visualization | 4 |
| website | 5 | cli | 2 |
- Python 3.8 or higher
- Required dependencies (see requirements.txt)
# Clone the repository
git clone <repository-url>
cd GeneralizedNotationNotation
# Install dependencies using UV (recommended)
uv sync
# Or install with optional MCP dependencies
uv sync --extra dev
# Verify installation
python -m src.mcp.cli --helpstdio Transport (Recommended for local use):
python -m src.mcp.cli server --transport stdioHTTP Transport (For network access):
python -m src.mcp.cli server --transport http --host 0.0.0.0 --port 8080List all available tools:
python -m src.mcp.cli list --format humanExecute a tool:
python -m src.mcp.cli execute get_gnn_files --params '{"target_dir": "doc", "recursive": true}'Get server status:
python -m src.mcp.cli status --format jsonfrom mcp import MCP
# Create MCP instance
mcp_instance = MCP()
# Initialize with module discovery
mcp_instance.discover_modules()def my_tool(param1: str, param2: int) -> Dict[str, Any]:
return {"result": f"{param1}_{param2}", "success": True}
schema = {
"type": "object",
"properties": {
"param1": {"type": "string", "minLength": 1},
"param2": {"type": "integer", "minimum": 0}
},
"required": ["param1", "param2"]
}
mcp_instance.register_tool(
name="my_tool",
func=my_tool,
schema=schema,
description="My custom tool",
module="my_module",
category="custom",
version="1.0.0",
tags=["custom", "example"],
examples=[{"param1": "hello", "param2": 42}],
timeout=30.0,
max_concurrent=5,
rate_limit=10.0,
cache_ttl=300.0,
input_validation=True,
output_validation=True
)def my_resource_retriever(uri: str) -> Dict[str, Any]:
return {"content": f"Resource content for {uri}", "uri": uri}
mcp_instance.register_resource(
uri_template="my://{resource_id}",
retriever=my_resource_retriever,
description="My custom resource",
module="my_module",
category="custom",
version="1.0.0",
mime_type="application/json",
cacheable=True,
tags=["custom", "resource"],
timeout=30.0,
rate_limit=10.0,
cache_ttl=300.0
)# Execute a tool
result = mcp_instance.execute_tool("my_tool", {
"param1": "hello",
"param2": 42
})
# Get tool performance statistics
stats = mcp_instance.get_tool_performance_stats("my_tool")# Get a resource
result = mcp_instance.get_resource("my://my_resource")# Get enhanced server status
status = mcp_instance.get_enhanced_server_status()
# Get basic server status
basic_status = mcp_instance.get_server_status()
# Get capabilities
capabilities = mcp_instance.get_capabilities()
# Clear caches
cache_stats = mcp_instance.clear_cache()Tool schemas support JSON-Schema-style type / range / format constraints and object composition. Validation runs before the tool handler is invoked:
# Complex schema with validation
schema = {
"type": "object",
"properties": {
"name": {
"type": "string",
"minLength": 3,
"maxLength": 50,
"pattern": "^[a-zA-Z0-9_]+$"
},
"age": {
"type": "integer",
"minimum": 0,
"maximum": 150
},
"tags": {
"type": "array",
"minItems": 1,
"maxItems": 10,
"items": {"type": "string"}
},
"settings": {
"type": "object",
"properties": {
"enabled": {"type": "boolean"},
"timeout": {"type": "number", "minimum": 0}
},
"required": ["enabled"]
}
},
"required": ["name", "age"],
"additionalProperties": False
}The implementation includes intelligent caching:
# Register tool with caching
mcp_instance.register_tool(
name="expensive_operation",
func=expensive_function,
schema=schema,
description="An expensive operation with caching",
cache_ttl=3600.0 # Cache for 1 hour
)
# Execute (will be cached)
result1 = mcp_instance.execute_tool("expensive_operation", {"param": "value"})
# Execute again (will use cache)
result2 = mcp_instance.execute_tool("expensive_operation", {"param": "value"})
# Clear cache
cache_stats = mcp_instance.clear_cache()Configure rate limiting per tool:
# Register tool with rate limiting
mcp_instance.register_tool(
name="rate_limited_tool",
func=my_function,
schema=schema,
description="A rate-limited tool",
rate_limit=5.0 # 5 requests per second
)
# Execute (will be rate limited if exceeded)
result = mcp_instance.execute_tool("rate_limited_tool", {})Limit concurrent executions:
# Register tool with concurrent limits
mcp_instance.register_tool(
name="concurrent_limited_tool",
func=my_function,
schema=schema,
description="A tool with concurrent execution limits",
max_concurrent=3 # Only 3 concurrent executions allowed
)The stdio server provides local process communication:
from mcp.server_stdio import StdioServer
# Create and start stdio server
server = StdioServer()
server.start()Usage:
python -m src.mcp.cli server --transport stdioThe HTTP server provides network-based access:
from mcp.server_http import MCPHTTPHandler
from http.server import HTTPServer
# Create HTTP server
server = HTTPServer(('localhost', 8080), MCPHTTPHandler)
server.serve_forever()Usage:
python -m src.mcp.cli server --transport http --host 0.0.0.0 --port 8080{
"jsonrpc": "2.0",
"id": 1,
"method": "mcp.capabilities",
"params": {}
}{
"jsonrpc": "2.0",
"id": 2,
"method": "mcp.tool.execute",
"params": {
"name": "my_tool",
"params": {
"param1": "hello",
"param2": 42
}
}
}{
"jsonrpc": "2.0",
"id": 3,
"method": "mcp.resource.get",
"params": {
"uri": "my://my_resource"
}
}Tools can also be invoked directly:
{
"jsonrpc": "2.0",
"id": 4,
"method": "my_tool",
"params": {
"param1": "hello",
"param2": 42
}
}{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32602,
"message": "Parameter 'param1' must be a string",
"data": {
"field": "param1",
"value": 123,
"tool_name": "my_tool",
"module_name": "my_module"
}
}
}-32700: Parse error-32600: Invalid Request-32601: Method not found-32602: Invalid params-32603: Internal error-32000: MCP-specific errors-32001: SDK not found-32002: Resource retrieval errors-32003: Module loading errors-32004: Performance errors
Each tool dispatch records latency, cache hit/miss, error counts, and concurrency
snapshot. get_enhanced_server_status() returns the aggregated view:
# Get performance metrics
status = mcp_instance.get_enhanced_server_status()
performance = status["performance"]
# Available metrics
print(f"Total requests: {performance['total_requests']}")
print(f"Success rate: {performance['success_rate']:.2%}")
print(f"Average execution time: {performance['average_execution_time']:.3f}s")
print(f"Cache hit ratio: {performance['cache_hit_ratio']:.2%}")
print(f"Concurrent requests: {performance['concurrent_requests']}")Real-time health status:
# Get health status
status = mcp_instance.get_enhanced_server_status()
health = status["health"]
print(f"Status: {health['status']}") # healthy, degraded, error
print(f"Error rate: {health['error_rate']:.2%}")
print(f"Cache efficiency: {health['cache_efficiency']:.2%}")
print(f"Concurrent load: {health['concurrent_load']:.2%}")Get detailed statistics for specific tools:
# Get tool performance stats
stats = mcp_instance.get_tool_performance_stats("my_tool")
if stats:
print(f"Execution count: {stats['execution_count']}")
print(f"Average execution time: {stats['average_execution_time']:.3f}s")
print(f"Success rate: {stats['success_rate']:.2%}")
print(f"Error count: {stats['error_count']}")Create a new MCP module by adding an mcp.py file to your module directory:
# src/my_module/mcp.py
def register_tools(mcp_instance):
"""Register tools for this module."""
# Define tool function
def my_tool(param: str) -> Dict[str, Any]:
return {"result": f"Processed: {param}"}
# Define schema
schema = {
"type": "object",
"properties": {
"param": {"type": "string", "minLength": 1}
},
"required": ["param"]
}
# Register tool
mcp_instance.register_tool(
name="my_tool",
func=my_tool,
schema=schema,
description="My custom tool",
module="my_module",
category="custom",
version="1.0.0",
tags=["custom"],
examples=[{"param": "example"}],
timeout=30.0,
rate_limit=10.0,
cache_ttl=300.0
)
# Register resource
def my_resource_retriever(uri: str) -> Dict[str, Any]:
return {"content": f"Resource: {uri}", "uri": uri}
mcp_instance.register_resource(
uri_template="my://{resource_id}",
retriever=my_resource_retriever,
description="My custom resource",
module="my_module",
category="custom",
version="1.0.0",
cacheable=True
)
# Module metadata
__version__ = "1.0.0"
__description__ = "My custom MCP module"
__dependencies__ = ["some_dependency"]Modules are automatically discovered during initialization:
# Discover modules
success = mcp_instance.discover_modules()
# Get module information
module_info = mcp_instance.get_module_info("my_module")# All MCP tests
pytest src/tests/test_mcp_overall.py src/tests/test_mcp_functional.py \
src/tests/test_mcp_tools.py src/tests/test_mcp_performance.py \
src/tests/test_mcp_audit.py -v
# Narrower slices
pytest src/tests/test_mcp_tools.py -v # registration + dispatch
pytest src/tests/test_mcp_performance.py -v # load / latency
pytest src/tests/test_mcp_audit.py -v # spec / schema audit- Unit Tests: Core functionality testing
- Integration Tests: End-to-end workflow testing
- Performance Tests: Performance and load testing
- Error Handling Tests: Error scenarios and edge cases
# Check module status
status = mcp_instance.get_enhanced_server_status()
modules = status["modules"]
for module_name, module_info in modules.items():
if module_info["status"] != "loaded":
print(f"Module {module_name}: {module_info['status']}")
if module_info.get("error_message"):
print(f" Error: {module_info['error_message']}")# Check performance metrics
status = mcp_instance.get_enhanced_server_status()
performance = status["performance"]
if performance["success_rate"] < 0.9:
print("Low success rate detected")
if performance["average_execution_time"] > 5.0:
print("High execution times detected")
if performance["cache_hit_ratio"] < 0.5:
print("Low cache efficiency")# Check memory usage
status = mcp_instance.get_enhanced_server_status()
resources = status["resources"]
if resources["cache_size"] > 1000:
print("Large cache detected, consider clearing")
mcp_instance.clear_cache()Enable debug logging:
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("mcp")# Use caching for expensive operations
mcp_instance.register_tool(
name="expensive_operation",
func=expensive_function,
schema=schema,
cache_ttl=3600.0 # Cache for 1 hour
)# Prevent abuse with rate limiting
mcp_instance.register_tool(
name="api_call",
func=api_function,
schema=schema,
rate_limit=10.0 # 10 requests per second
)# Limit resource usage
mcp_instance.register_tool(
name="resource_intensive",
func=intensive_function,
schema=schema,
max_concurrent=2 # Only 2 concurrent executions
)# Tools requiring authentication
mcp_instance.register_tool(
name="sensitive_operation",
func=sensitive_function,
schema=schema,
requires_auth=True
)# Always enable input validation
mcp_instance.register_tool(
name="safe_tool",
func=safe_function,
schema=schema,
input_validation=True,
output_validation=True
)# Prevent abuse
mcp_instance.register_tool(
name="public_api",
func=public_function,
schema=schema,
rate_limit=5.0 # 5 requests per second
)- Clear Naming: Use descriptive, consistent tool names
- Comprehensive Schemas: Define detailed parameter schemas
- Error Handling: Provide meaningful error messages
- Documentation: Include clear descriptions and examples
- Validation: Enable input and output validation
- Caching: Use caching for expensive operations
- Rate Limiting: Implement appropriate rate limits
- Concurrent Limits: Set reasonable concurrent execution limits
- Monitoring: Monitor performance metrics regularly
- Optimization: Profile and optimize slow operations
- Authentication: Require authentication for sensitive operations
- Validation: Always validate inputs and outputs
- Rate Limiting: Prevent abuse with rate limiting
- Logging: Log security-relevant events
- Updates: Keep dependencies updated
- Structure: Follow the standard module structure
- Metadata: Provide complete module metadata
- Error Handling: Handle errors gracefully
- Testing: Cover happy path, error path, and schema rejection
- Documentation: Document all tools and resources
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests for new functionality
- Update documentation
- Submit a pull request
- Follow PEP 8 style guidelines
- Use type hints throughout
- Include docstrings with parameters, returns, and at least one example
- Write clear, readable code
- Write unit tests for all new functionality
- Include integration tests for complex workflows
- Test error scenarios and edge cases
- Maintain good test coverage
The MCP subsystem exposes every pipeline module as JSON-RPC tools discovered from
src/*/mcp.py. Wrappers around dispatch add per-tool caching, concurrency caps,
rate limiting, and metrics. get_enhanced_server_status() is the canonical
snapshot for debugging live servers.
For per-module tool inventories, see the mcp.py files in each module and the
main documentation in doc/mcp/.