Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions mcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ See [TUTORIAL.md](TUTORIAL.md) for more examples (including s2n-bignum ARM proof

| Tool | Description | Output |
|------|-------------|--------|
| `eval` | Evaluate arbitrary OCaml/HOL Light code | Raw text (ANSI stripped) |
| `eval` | Evaluate arbitrary OCaml/HOL Light code | Structured JSON (truncated) |
| `set_goal` | Set a proof goal, return initial state | Structured JSON |
| `goal_state` | Return current proof goals | Structured JSON |
| `apply_tactic` | Apply a tactic, return new state or proved theorem | Structured JSON |
Expand All @@ -42,12 +42,16 @@ See [TUTORIAL.md](TUTORIAL.md) for more examples (including s2n-bignum ARM proof
| `backtrack` | Undo tactic steps | Structured JSON |
| `search_theorems` | Search theorem database by name | Structured JSON |
| `hol_type` | Get the type of a term | Raw text |
| `hol_load` | Load a HOL Light file via `needs` | Raw text |
| `hol_load` | Load a HOL Light file via `needs` | Structured JSON |
| `hol_interrupt` | Cancel a long-running command | Status message |
| `hol_restart` | Kill and restart the HOL Light subprocess | Status message |
| `hol_status` | Check process health, uptime, config, checkpoint | Structured JSON |
| `hol_help` | Return tactic reference and proof guide (SKILL.md) | Markdown text |

`eval` returns `{"success", "output", "output_truncated", "full_output_chars", "time_seconds"}`. Large outputs are truncated to `max_output_chars` (default 4000, configurable). Override per-call with `max_output_chars=N`.

`hol_load` returns `{"success", "file", "time_seconds"}` (plus `"error"` on failure). Intermediate output is suppressed — use `eval` with `needs "file.ml"` if you need verbose output.

## Setup

```bash
Expand Down Expand Up @@ -89,6 +93,9 @@ checkpoint = "s2n"

# Timeout in seconds for HOL Light commands.
timeout = 600

# Maximum characters for eval output before truncation.
max_output_chars = 4000
```

Use `hol_status` to verify which config file and checkpoint are active.
Expand Down Expand Up @@ -129,8 +136,8 @@ The server includes a built-in `hol_help` tool that returns the full tactic refe

```bash
cd mcp
uv run pytest test_server.py -v # 35 unit tests
uv run python smoke_test.py # 25 MCP integration checks
uv run pytest test_server.py -v # 38 unit tests
uv run python smoke_test.py # 34 MCP integration checks
```

First run includes HOL Light startup (~75s cold, ~2s with checkpoint).
Expand Down
3 changes: 3 additions & 0 deletions mcp/hol-mcp.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ checkpoint = "noledit"

# Timeout in seconds for HOL Light commands.
timeout = 600

# Maximum characters for eval output before truncation.
max_output_chars = 4000
95 changes: 72 additions & 23 deletions mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def _load_config():
_config, CONFIG_PATH = _load_config()
TIMEOUT = _config.get("timeout", int(os.environ.get("HOL_TIMEOUT", "600")))
CHECKPOINT_NAME = _config.get("checkpoint", os.environ.get("HOL_CHECKPOINT", "noledit"))
MAX_OUTPUT_CHARS = _config.get("max_output_chars", int(os.environ.get("HOL_MAX_OUTPUT", "4000")))

from mcp.server.fastmcp import FastMCP
mcp = FastMCP("hol-light",
Expand Down Expand Up @@ -175,27 +176,29 @@ def _load_helpers():
raise RuntimeError(f"Failed to load MCP helpers: {result}")


def _eval_raw(code: str, timeout: int = None) -> str:
"""Eval code, return raw output. Caller must hold _lock and ensure HOL is started."""
def _eval_raw(code: str, timeout: int = None) -> tuple[str, float]:
"""Eval code, return (output, elapsed_seconds). Caller must hold _lock."""
_drain_queue()
_reader_buf.clear()
full = code.rstrip()
if not full.endswith(";;"):
full += ";;"
full += f'\nPrintf.printf "{SENTINEL}\\n%!";;\n'
t0 = time.time()
_proc.stdin.write(full)
_proc.stdin.flush()
return _wait_for_sentinel(timeout)
result = _wait_for_sentinel(timeout)
return result, round(time.time() - t0, 3)


def _eval_code(code: str, timeout: int = None) -> str:
def _eval_code(code: str, timeout: int = None) -> tuple[str, float]:
with _lock:
_start_hol()
_load_helpers()
return _eval_raw(code, timeout)


def _eval_json(code: str, timeout: int = None) -> str:
def _eval_json(code: str, timeout: int = None) -> tuple[str, float]:
"""Eval OCaml code that produces a string, print it to stdout, return it.
Uses print_string to avoid OCaml's string truncation in REPL output."""
return _eval_code(f'print_string ({code}); print_newline ()', timeout)
Expand All @@ -205,16 +208,48 @@ def _strip_ansi(s: str) -> str:
return ANSI_RE.sub("", s)


def _truncate(s: str, limit: int) -> tuple[str, bool]:
"""Truncate string to limit chars. Returns (result, was_truncated)."""
if len(s) <= limit:
return s, False
return s[:limit] + "... [truncated]", True


def _is_error_output(s: str) -> bool:
"""Heuristic: check if OCaml output indicates an error."""
for marker in ("Error:", "Exception:", "Failure", "Unbound", "Parse error",
"Syntax error", "Type error", "This expression has type"):
if marker in s:
return True
return False


@mcp.tool()
def eval(code: str, timeout: int = None) -> str:
"""Evaluate OCaml/HOL Light code and return the output.
def eval(code: str, timeout: int = None, max_output_chars: int = None) -> str:
"""Evaluate OCaml/HOL Light code and return structured JSON.

Examples:
ARITH_RULE `1 + 1 = 2`;;
TAUT `p /\\ q ==> q /\\ p`;;
search [name "ARITH"];;
Args:
code: OCaml/HOL Light code to evaluate.
timeout: Optional timeout in seconds.
max_output_chars: Max chars for output field (default from config, typically 4000).

Returns JSON:
{"success": bool, "output": str, "output_truncated": bool,
"full_output_chars": int, "time_seconds": float}
"""
return _strip_ansi(_eval_code(code, timeout))
import json as _json
raw, elapsed = _eval_code(code, timeout)
raw = _strip_ansi(raw)
limit = max_output_chars if max_output_chars is not None else MAX_OUTPUT_CHARS
full_len = len(raw)
output, truncated = _truncate(raw, limit)
return _json.dumps({
"success": not _is_error_output(raw),
"output": output,
"output_truncated": truncated,
"full_output_chars": full_len,
"time_seconds": elapsed,
})


@mcp.tool()
Expand All @@ -225,7 +260,7 @@ def goal_state() -> str:
"num_subgoals": N, "total_goals": M}
Returns empty goals list if no proof is in progress.
"""
return _extract_json(_eval_json("mcp_json_goalstate ()"))
return _extract_json(_eval_json("mcp_json_goalstate ()")[0])


@mcp.tool()
Expand All @@ -247,7 +282,7 @@ def apply_tactic(tactic: str, timeout: int = None) -> str:
f'with Failure s -> print_string (mcp_json_error s) '
f'| e -> print_string (mcp_json_error (Printexc.to_string e))); '
f'print_newline ()')
return _extract_json(_eval_code(code, timeout))
return _extract_json(_eval_code(code, timeout)[0])


@mcp.tool()
Expand All @@ -269,7 +304,7 @@ def apply_tactics(tactics: list[str], timeout: int = None) -> str:
return '{"error":"empty tactic list"}'
tac_list = "[" + "; ".join(tactics) + "]"
code = (f'print_string (mcp_json_apply_tactics {tac_list}); print_newline ()')
return _extract_json(_eval_code(code, timeout))
return _extract_json(_eval_code(code, timeout)[0])


@mcp.tool()
Expand All @@ -294,7 +329,7 @@ def prove(goal: str, tactic: str, timeout: int = None) -> str:
f'with Failure s -> print_string (mcp_json_error s) '
f'| e -> print_string (mcp_json_error (Printexc.to_string e))); '
f'print_newline ()')
return _extract_json(_eval_code(code, timeout))
return _extract_json(_eval_code(code, timeout)[0])


@mcp.tool()
Expand All @@ -306,7 +341,7 @@ def backtrack(steps: int = 1) -> str:

Returns JSON goal state or {"error": "..."} if can't back up.
"""
return _extract_json(_eval_json(f"mcp_json_backtrack {steps}"))
return _extract_json(_eval_json(f"mcp_json_backtrack {steps}")[0])


@mcp.tool()
Expand All @@ -319,7 +354,7 @@ def search_theorems(name: str, limit: int = 20) -> str:

Returns JSON array: [{"name": "...", "statement": "..."}, ...]
"""
return _extract_json(_eval_json(f'mcp_json_search "{_ocaml_escape(name)}" {limit}'))
return _extract_json(_eval_json(f'mcp_json_search "{_ocaml_escape(name)}" {limit}')[0])


@mcp.tool()
Expand All @@ -333,7 +368,7 @@ def set_goal(goal: str) -> str:
"""
code = (f'ignore(g({goal})); '
f'print_string (mcp_json_goalstate ()); print_newline ()')
return _extract_json(_eval_code(code))
return _extract_json(_eval_code(code)[0])


@mcp.tool()
Expand All @@ -345,7 +380,7 @@ def hol_type(term: str) -> str:

Returns the type as a string.
"""
return _strip_ansi(_eval_code(f"type_of {term}"))
return _strip_ansi(_eval_code(f"type_of {term}")[0])


@mcp.tool()
Expand All @@ -355,9 +390,21 @@ def hol_load(file: str) -> str:
Args:
file: File path to load (e.g., "Library/words.ml")

Returns the output from loading.
Returns JSON:
{"success": bool, "file": str, "time_seconds": float}
On failure: {"success": false, "file": str, "error": str, "time_seconds": float}
"""
return _strip_ansi(_eval_code(f'needs "{_ocaml_escape(file)}"'))
import json as _json
raw, elapsed = _eval_code(f'needs "{_ocaml_escape(file)}"')
raw = _strip_ansi(raw)
if _is_error_output(raw):
return _json.dumps({
"success": False, "file": file, "error": raw.strip(),
"time_seconds": elapsed,
})
return _json.dumps({
"success": True, "file": file, "time_seconds": elapsed,
})


@mcp.tool()
Expand Down Expand Up @@ -408,7 +455,8 @@ def hol_status() -> str:
"""Check whether the HOL Light subprocess is alive.

Returns JSON: {"alive": bool, "pid": int|null, "checkpoint": str,
"config": str|null, "uptime_seconds": float|null, "timeout": int}
"config": str|null, "uptime_seconds": float|null,
"timeout": int, "max_output_chars": int}
"""
import json
alive = _proc is not None and _proc.poll() is None
Expand All @@ -419,6 +467,7 @@ def hol_status() -> str:
"config": CONFIG_PATH,
"uptime_seconds": round(time.time() - _start_time, 1) if alive and _start_time else None,
"timeout": TIMEOUT,
"max_output_chars": MAX_OUTPUT_CHARS,
})


Expand Down
30 changes: 26 additions & 4 deletions mcp/smoke_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,18 @@ async def main():
check("tools registered", tool_names == ["apply_tactic", "apply_tactics", "backtrack", "eval", "goal_state", "hol_help", "hol_interrupt", "hol_load", "hol_restart", "hol_status", "hol_type", "prove", "search_theorems", "set_goal"],
f"got {tool_names}")

# eval — basic arithmetic
# eval — basic arithmetic (now returns JSON)
r = await session.call_tool("eval", {"code": "ARITH_RULE `1 + 1 = 2`"})
check("eval: ARITH_RULE", "|- 1 + 1 = 2" in r.content[0].text, r.content[0].text)
ev = json.loads(r.content[0].text)
check("eval: ARITH_RULE success", ev["success"] is True, str(ev))
check("eval: ARITH_RULE output", "|- 1 + 1 = 2" in ev["output"], ev["output"])
check("eval: has time_seconds", isinstance(ev["time_seconds"], float), str(ev))
check("eval: has full_output_chars", isinstance(ev["full_output_chars"], int), str(ev))

# eval — truncation
r = await session.call_tool("eval", {"code": 'search [name "ADD"]', "max_output_chars": 100})
ev = json.loads(r.content[0].text)
check("eval: truncation works", ev["output_truncated"] is True and ev["full_output_chars"] > 100, str(ev))

# goal_state — no goal set
r = await session.call_tool("goal_state", {})
Expand Down Expand Up @@ -92,16 +101,27 @@ async def main():
r = await session.call_tool("hol_type", {"term": "`1 + 1`"})
check("hol_type", "num" in r.content[0].text, r.content[0].text)

# hol_load (now returns JSON)
r = await session.call_tool("hol_load", {"file": "Library/iter.ml"})
hl = json.loads(r.content[0].text)
check("hol_load: success", hl["success"] is True, str(hl))
check("hol_load: has time", isinstance(hl["time_seconds"], float), str(hl))

# hol_status
r = await session.call_tool("hol_status", {})
status = json.loads(r.content[0].text)
check("hol_status: alive", status["alive"] is True, str(status))
check("hol_status: has pid", isinstance(status["pid"], int), str(status))
check("hol_status: has max_output_chars", isinstance(status["max_output_chars"], int), str(status))

# hol_help
r = await session.call_tool("hol_help", {})
check("hol_help", "## Core tactics" in r.content[0].text, r.content[0].text[:200])

# hol_interrupt
r = await session.call_tool("hol_interrupt", {})
check("hol_interrupt", "Interrupt sent" in r.content[0].text or "No HOL Light process" in r.content[0].text, r.content[0].text)

# prove tool
r = await session.call_tool("prove", {"goal": "`!n. 0 + n = n`", "tactic": "GEN_TAC THEN REWRITE_TAC[ADD]"})
result = json.loads(r.content[0].text)
Expand All @@ -121,13 +141,15 @@ async def main():

# eval with per-call timeout
r = await session.call_tool("eval", {"code": "1 + 1", "timeout": 30})
check("eval: custom timeout", "2" in r.content[0].text, r.content[0].text)
ev = json.loads(r.content[0].text)
check("eval: custom timeout", "2" in ev["output"], ev["output"])

# hol_restart (run last — kills the process)
r = await session.call_tool("hol_restart", {})
check("hol_restart", "restarted" in r.content[0].text.lower(), r.content[0].text)
r = await session.call_tool("eval", {"code": "1 + 1"})
check("eval after restart", "2" in r.content[0].text, r.content[0].text)
ev = json.loads(r.content[0].text)
check("eval after restart", "2" in ev["output"], ev["output"])

print(f"\n{passed}/{passed + failed} passed")
return failed == 0
Expand Down
Loading
Loading