diff --git a/docs/agent-payment-guard-service.md b/docs/agent-payment-guard-service.md new file mode 100644 index 0000000..bdbe7ac --- /dev/null +++ b/docs/agent-payment-guard-service.md @@ -0,0 +1,78 @@ +# Agent Payment Guard Service (Local HTTP) + +This service exposes ProofPath Agent Payment Guard over local HTTP so an external AI agent runtime can request an authorization decision before execution. + +## Start service + +```bash +python3 examples/agent-payment-guard/payment_guard_service.py --host 127.0.0.1 --port 8787 +``` + +## Health check + +```bash +curl -sS http://127.0.0.1:8787/v1/health | python3 -m json.tool +``` + +Expected response: + +```json +{ + "status": "ok", + "surface": "agent-payment-guard-service", + "version": "0.1" +} +``` + +## Evaluate a proposal (enforce mode) + +```bash +curl -sS -X POST http://127.0.0.1:8787/v1/payment-proposals/evaluate \ + -H 'content-type: application/json' \ + -d "$(python3 - <<'PY' +import json +proposal = json.load(open('examples/agent-payment-guard/payment_proposal.valid_micro_payment.json', encoding='utf-8')) +envelope = json.load(open('examples/agent-payment-guard/intent_envelopes/intent.valid.json', encoding='utf-8')) +print(json.dumps({'mode':'enforce','proposal':proposal,'intent_envelope':envelope})) +PY +)" | python3 -m json.tool +``` + +Enforce semantics: +- `ACCEPT` => `execution_allowed=true`, `would_block=false` +- `HOLD` / `BLOCK` => `execution_allowed=false`, `would_block=true` + +## Evaluate a proposal (shadow mode) + +```bash +curl -sS -X POST http://127.0.0.1:8787/v1/payment-proposals/evaluate \ + -H 'content-type: application/json' \ + -d "$(python3 - <<'PY' +import json +proposal = json.load(open('examples/agent-payment-guard/payment_proposal.asset_not_allowed.json', encoding='utf-8')) +print(json.dumps({'mode':'shadow','proposal':proposal,'intent_envelope':None})) +PY +)" | python3 -m json.tool +``` + +Shadow semantics: +- `ACCEPT` => `execution_allowed=true`, `would_block=false` +- `HOLD` / `BLOCK` => `execution_allowed=true`, `would_block=true` + +Shadow mode always writes an audit record with actual decision and reason. + +## Read recent audit records + +```bash +curl -sS 'http://127.0.0.1:8787/v1/audit/records?limit=20' | python3 -m json.tool +``` + +Records come from `.proofpath/audit.jsonl` and include hash chaining (`previous_hash`, `hash`). + +## Local validation + +```bash +bash examples/agent-payment-guard/run_demo_check.sh +bash examples/agent-payment-guard/run_service_check.sh +python3 scripts/verify_audit_log.py .proofpath/audit.jsonl +``` diff --git a/examples/agent-payment-guard/payment_guard_service.py b/examples/agent-payment-guard/payment_guard_service.py new file mode 100644 index 0000000..971be09 --- /dev/null +++ b/examples/agent-payment-guard/payment_guard_service.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any, Dict, List, Tuple +from urllib.parse import parse_qs, urlparse + +from payment_guard import append_audit, decide, get_previous_hash, load_json + + +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 8787 + + +def parse_mode(raw: Any) -> str: + mode = str(raw or "enforce").strip().lower() + if mode not in {"enforce", "shadow"}: + raise ValueError("mode must be 'enforce' or 'shadow'") + return mode + + +def execution_flags(mode: str, decision: str) -> Tuple[bool, bool]: + if mode == "enforce": + allowed = decision == "ACCEPT" + return allowed, not allowed + + # shadow mode + would_block = decision in {"HOLD", "BLOCK"} + return True, would_block + + +def read_audit_records(path: Path, limit: int) -> List[Dict[str, Any]]: + if not path.exists(): + return [] + lines = [line for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] + selected = lines[-limit:] + return [json.loads(line) for line in selected] + + +class PaymentGuardServiceHandler(BaseHTTPRequestHandler): + server_version = "AgentPaymentGuardService/0.1" + + def do_GET(self) -> None: # noqa: N802 + parsed = urlparse(self.path) + if parsed.path == "/v1/health": + self._send_json( + HTTPStatus.OK, + {"status": "ok", "surface": "agent-payment-guard-service", "version": "0.1"}, + ) + return + + if parsed.path == "/v1/audit/records": + query = parse_qs(parsed.query) + try: + limit = int(query.get("limit", ["20"])[0]) + except ValueError: + self._send_json(HTTPStatus.BAD_REQUEST, {"error": "limit must be an integer"}) + return + limit = min(max(limit, 1), 100) + records = read_audit_records(self.server.audit_path, limit) + self._send_json(HTTPStatus.OK, {"records": records, "count": len(records), "limit": limit}) + return + + self._send_json(HTTPStatus.NOT_FOUND, {"error": "not found"}) + + def do_POST(self) -> None: # noqa: N802 + parsed = urlparse(self.path) + if parsed.path != "/v1/payment-proposals/evaluate": + self._send_json(HTTPStatus.NOT_FOUND, {"error": "not found"}) + return + + payload = self._read_json_body() + if isinstance(payload, tuple): + status, body = payload + self._send_json(status, body) + return + + try: + mode = parse_mode(payload.get("mode")) + except ValueError as exc: + self._send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) + return + + proposal = payload.get("proposal") + if not isinstance(proposal, dict): + self._send_json(HTTPStatus.BAD_REQUEST, {"error": "proposal must be an object"}) + return + + envelope = payload.get("intent_envelope") + if envelope is not None and not isinstance(envelope, dict): + self._send_json(HTTPStatus.BAD_REQUEST, {"error": "intent_envelope must be an object or null"}) + return + + decision, reason, intent_meta = decide( + proposal=proposal, + policy=self.server.policy, + envelope=envelope, + strict_mode=(mode == "enforce"), + audit_path=self.server.audit_path, + ) + append_audit(self.server.audit_path, proposal, decision, reason, intent_meta) + audit_hash = get_previous_hash(self.server.audit_path) + execution_allowed, would_block = execution_flags(mode, decision) + + self._send_json( + HTTPStatus.OK, + { + "mode": mode, + "decision": decision, + "reason": reason, + "execution_allowed": execution_allowed, + "would_block": would_block, + "audit_hash": audit_hash, + }, + ) + + def _read_json_body(self) -> Dict[str, Any] | Tuple[HTTPStatus, Dict[str, str]]: + try: + content_length = int(self.headers.get("Content-Length", "0")) + except ValueError: + return HTTPStatus.BAD_REQUEST, {"error": "invalid content length"} + + raw = self.rfile.read(content_length) + try: + payload = json.loads(raw.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError): + return HTTPStatus.BAD_REQUEST, {"error": "invalid JSON body"} + + if not isinstance(payload, dict): + return HTTPStatus.BAD_REQUEST, {"error": "request body must be an object"} + return payload + + def _send_json(self, status: HTTPStatus, payload: Dict[str, Any]) -> None: + encoded = json.dumps(payload, separators=(",", ":")).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + def log_message(self, format: str, *args: Any) -> None: + return + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--host", default=DEFAULT_HOST) + parser.add_argument("--port", type=int, default=DEFAULT_PORT) + parser.add_argument("--policy", default="examples/agent-payment-guard/payment_policy.json") + parser.add_argument("--audit-path", default=".proofpath/audit.jsonl") + args = parser.parse_args() + + policy = load_json(Path(args.policy)) + server = ThreadingHTTPServer((args.host, args.port), PaymentGuardServiceHandler) + server.policy = policy + server.audit_path = Path(args.audit_path) + + print(f"Agent Payment Guard service listening on http://{args.host}:{args.port}") + try: + server.serve_forever() + except KeyboardInterrupt: + pass + finally: + server.server_close() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/agent-payment-guard/run_service_check.sh b/examples/agent-payment-guard/run_service_check.sh new file mode 100755 index 0000000..b877470 --- /dev/null +++ b/examples/agent-payment-guard/run_service_check.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +cd "$ROOT_DIR" + +rm -f .proofpath/audit.jsonl + +HOST="127.0.0.1" +PORT="18787" +SERVICE="examples/agent-payment-guard/payment_guard_service.py" + +python3 "$SERVICE" --host "$HOST" --port "$PORT" >/tmp/payment_guard_service.log 2>&1 & +SERVICE_PID=$! +cleanup() { + kill "$SERVICE_PID" >/dev/null 2>&1 || true +} +trap cleanup EXIT + +for _ in $(seq 1 50); do + if curl -fsS "http://$HOST:$PORT/v1/health" >/dev/null; then + break + fi + sleep 0.1 +done +curl -fsS "http://$HOST:$PORT/v1/health" | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["status"]=="ok"; assert r["surface"]=="agent-payment-guard-service"; assert r["version"]=="0.1"' + +VALID_INTENT=$(cat examples/agent-payment-guard/intent_envelopes/intent.valid.json) +VALID_PROPOSAL=$(cat examples/agent-payment-guard/payment_proposal.valid_micro_payment.json) + +curl -fsS -X POST "http://$HOST:$PORT/v1/payment-proposals/evaluate" \ + -H 'content-type: application/json' \ + -d "{\"mode\":\"enforce\",\"proposal\":$VALID_PROPOSAL,\"intent_envelope\":$VALID_INTENT}" \ + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["mode"]=="enforce"; assert r["decision"]=="ACCEPT"; assert r["execution_allowed"] is True; assert r["would_block"] is False; assert r["audit_hash"].startswith("sha256:")' + +BAD_PROPOSAL=$(cat examples/agent-payment-guard/payment_proposal.asset_not_allowed.json) +curl -fsS -X POST "http://$HOST:$PORT/v1/payment-proposals/evaluate" \ + -H 'content-type: application/json' \ + -d "{\"mode\":\"shadow\",\"proposal\":$BAD_PROPOSAL,\"intent_envelope\":null}" \ + | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["mode"]=="shadow"; assert r["decision"]=="BLOCK"; assert r["execution_allowed"] is True; assert r["would_block"] is True; assert r["audit_hash"].startswith("sha256:")' + +curl -fsS "http://$HOST:$PORT/v1/audit/records" | python3 -c 'import json,sys; r=json.load(sys.stdin); assert r["count"]==2; assert len(r["records"])==2; assert r["records"][0]["decision"]=="ACCEPT"; assert r["records"][1]["decision"]=="BLOCK"' + +python3 scripts/verify_audit_log.py .proofpath/audit.jsonl >/dev/null + +echo "Agent Payment Guard service check passed."