Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 59 additions & 24 deletions markitdown_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("markitdown-mcp")

# Exact MIME types for format-specific security validation. A substring check
# like `"xml" in mime_type` incorrectly matches Office OpenXML formats (docx,
# xlsx, pptx — whose MIME contains "openxmlformats") and routes them through
# the XML sanitizer, corrupting the binary payload.
_XML_MIME_TYPES = {"text/xml", "application/xml"}
_JSON_MIME_TYPES = {"application/json", "text/json"}
_CSV_MIME_TYPES = {"text/csv", "application/csv"}


class SecurityError(Exception):
"""Raised when a security violation is detected."""
Expand Down Expand Up @@ -281,12 +289,13 @@ def validate_file_content_security(file_path: str) -> str:
mime_type, _ = mimetypes.guess_type(file_path)
file_ext = Path(file_path).suffix.lower()

# Apply format-specific validation
if (mime_type and "xml" in mime_type) or file_ext in [".xml", ".xhtml"]:
# Apply format-specific validation. Use exact MIME-type matching to
# avoid false positives (e.g. docx MIME contains "xml" as substring).
if (mime_type in _XML_MIME_TYPES) or file_ext in [".xml", ".xhtml"]:
return validate_xml_security(file_path)
if (mime_type and "json" in mime_type) or file_ext == ".json":
if (mime_type in _JSON_MIME_TYPES) or file_ext == ".json":
return validate_json_security(file_path)
if (mime_type and "csv" in mime_type) or file_ext == ".csv":
if (mime_type in _CSV_MIME_TYPES) or file_ext == ".csv":
return validate_csv_security(file_path)

# General file size check
Expand Down Expand Up @@ -640,13 +649,22 @@ def get_safe_working_directories() -> list[str]:
# Add current working directory
safe_dirs.append(str(Path.cwd()))

# Add home directory subdirectories (but not root directories)
home = Path.home()
safe_subdirs = ["Documents", "Downloads", "Desktop", "tmp"]
for subdir in safe_subdirs:
potential_dir = home / subdir
if potential_dir.exists():
safe_dirs.append(str(potential_dir))
# Home directory subdirectories. Path.home() raises RuntimeError if neither
# HOME nor USERPROFILE is set, which can happen when MCP clients spawn the
# server with an empty environment. Fall back gracefully rather than
# crashing the server at init.
try:
home = Path.home()
except RuntimeError:
logger.warning("Could not determine user home; skipping home subdirs")
home = None

if home is not None:
safe_subdirs = ["Documents", "Downloads", "Desktop", "tmp"]
for subdir in safe_subdirs:
potential_dir = home / subdir
if potential_dir.exists():
safe_dirs.append(str(potential_dir))

# Add temp directories
temp_dir = Path(tempfile.gettempdir())
Expand All @@ -664,7 +682,8 @@ def get_safe_working_directories() -> list[str]:
class MCPRequest:
"""Represents an incoming MCP protocol request."""

id: str
# JSON-RPC 2.0: id may be str, int, or absent (notification → None here).
id: str | int | None
method: str
params: dict[str, Any]

Expand All @@ -673,7 +692,7 @@ class MCPRequest:
class MCPResponse:
"""Represents an MCP protocol response."""

id: str
id: str | int | None
result: dict[str, Any] | None = None
error: dict[str, Any] | None = None

Expand Down Expand Up @@ -758,7 +777,11 @@ def get_tools(self) -> list[dict[str, Any]]:
return [
{
"name": "convert_file",
"description": "Convert a file to Markdown using MarkItDown",
"description": (
"Convert a file to Markdown using MarkItDown. "
"Provide either 'file_path' OR both 'file_content' (base64) "
"and 'filename'."
),
"inputSchema": {
"type": "object",
"properties": {
Expand All @@ -777,10 +800,6 @@ def get_tools(self) -> list[dict[str, Any]]:
"description": "Original filename when using file_content",
},
},
"anyOf": [
{"required": ["file_path"]},
{"required": ["file_content", "filename"]},
],
},
},
{
Expand Down Expand Up @@ -828,7 +847,11 @@ async def handle_request(self, request: MCPRequest) -> MCPResponse:
"tools": [
{
"name": "convert_file",
"description": "Convert a file to Markdown using MarkItDown",
"description": (
"Convert a file to Markdown using MarkItDown. "
"Provide either 'file_path' OR both "
"'file_content' (base64) and 'filename'."
),
"inputSchema": {
"type": "object",
"properties": {
Expand All @@ -849,10 +872,6 @@ async def handle_request(self, request: MCPRequest) -> MCPResponse:
"file_content",
},
},
"anyOf": [
{"required": ["file_path"]},
{"required": ["file_content", "filename"]},
],
},
},
{
Expand Down Expand Up @@ -1259,14 +1278,22 @@ async def run(self) -> None:

try:
message = json.loads(line.strip())

# JSON-RPC 2.0 §4.1: messages without "id" are notifications;
# the server MUST NOT reply to them.
is_notification = "id" not in message

request = MCPRequest(
id=message.get("id", "unknown"),
id=message.get("id"),
method=message["method"],
params=message.get("params", {}),
)

response = await self.handle_request(request)

if is_notification:
continue

# Send response
response_dict: dict[str, Any] = {"jsonrpc": "2.0", "id": response.id}
if response.result is not None:
Expand All @@ -1289,6 +1316,14 @@ async def run(self) -> None:

def main() -> None:
"""Main entry point for console script."""
# Force UTF-8 + LF on stdio. Python's text-mode defaults on Windows are
# cp1252 for stdin/stdout and CRLF translation on stdout, both of which
# break the MCP protocol: clients send UTF-8 JSON-RPC framed by LF, so
# cp1252 stdin corrupts non-ASCII input (e.g. path names with umlauts)
# and CRLF stdout corrupts the line-delimited framing. No-op on platforms
# that already default to UTF-8/LF.
sys.stdout.reconfigure(encoding="utf-8", newline="\n")
sys.stdin.reconfigure(encoding="utf-8")

async def run_server() -> None:
"""Run the MCP server asynchronously."""
Expand Down
Loading