diff --git a/README.md b/README.md index 86792ce..067fc38 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,8 @@ See `docs/TESTING.md` for install and CLI smoke tests. 3. ACP calls an OpenAI-compatible LLM API to rank candidates. 4. Core validates and filters LLM output. +The flow now returns explicit LLM diagnostics and fallback metadata. When ACP falls back to deterministic core behavior, the response includes `llm_used`, `llm_status`, `fallback_reason`, `fallback_mode`, and a small `diagnostics` object instead of silently degrading. + For details on the design, see `docs/PHENOTYPE_RECOMMENDATION_DESIGN.md`. ### `phenotype_improvements` flow (ACP + MCP + LLM) @@ -198,12 +200,18 @@ Recommended contents of `.env`: EMBED_API_KEY= EMBED_MODEL= EMBED_URL=http://172.17.0.1:3000/ollama/api/embed # or equivalent +EMBED_TIMEOUT=120 LLM_API_KEY= LLM_API_URL=http://172.17.0.1:3000/api/chat/completions # or equivalent LLM_MODEL= LLM_LOG=1 LLM_USE_RESPONSES=0 -LLM_TIMEOUT=180 +LLM_TIMEOUT=300 +STUDY_AGENT_MCP_TIMEOUT=240 +ACP_TIMEOUT=360 +LLM_CANDIDATE_LIMIT=5 +LLM_RECOMMENDATION_TOP_K=20 +LLM_RECOMMENDATION_MAX_RESULTS=3 STUDY_AGENT_ALLOW_CORE_FALLBACK=0 STUDY_AGENT_DEBUG=1 ``` @@ -243,6 +251,51 @@ Notes: - ACP is exposed on port 8765 and MCP on port 8790. - The phenotype index is mounted from `./data/phenotype_index` into MCP at `/data/phenotype_index`. +### Constrained deployment guidance + +Recommended timeout ladder: + +- `ACP_TIMEOUT=360` +- `LLM_TIMEOUT=300` +- `STUDY_AGENT_MCP_TIMEOUT=240` +- `EMBED_TIMEOUT=120` to `180` + +Recommendation tuning knobs: + +- `LLM_USE_RESPONSES=0` is the default and the recommended setting when `LLM_API_URL` points at `/api/chat/completions`. +- `LLM_CANDIDATE_LIMIT=5` trims recommendation prompts before the LLM call. +- `LLM_RECOMMENDATION_TOP_K=20` controls retrieval breadth before truncation. +- `LLM_RECOMMENDATION_MAX_RESULTS=3` keeps early recommendation outputs small and easier to parse. + +Recommendation responses now include explicit fallback metadata and diagnostics. Example parse-failure fallback: + +```json +{ + "status": "ok", + "llm_used": false, + "llm_status": "json_parse_failed", + "fallback_reason": "llm_json_parse_failed", + "fallback_mode": "stub", + "candidate_limit": 5, + "candidate_count": 5, + "diagnostics": { + "llm_status": "json_parse_failed", + "llm_duration_seconds": 12.5, + "llm_error": "json_parse_failed", + "llm_parse_stage": "chat_completions_content:json_loads", + "llm_schema_valid": false + } +} +``` + +You can also derive environment-specific timeout starting values with: + +```bash +doit calibrate_timeouts +``` + +The calibration task samples the main structured phenotype flows, compares multiple candidate limits, and writes both a recommended timeout `.env` fragment and a JSON timing report. + Detailed tests can be found in `docs/TESTING.md` but this one is useful for a quick check that a tool that uses the chat completion is functioning reachable: ``` # phenotype_intent_split diff --git a/acp_agent/README.md b/acp_agent/README.md index faed92e..6c372db 100644 --- a/acp_agent/README.md +++ b/acp_agent/README.md @@ -14,10 +14,23 @@ Set these environment variables to enable LLM calls from ACP: - `LLM_API_URL` (default `http://localhost:3000/api/chat/completions`) - `LLM_API_KEY` (required) - `LLM_MODEL` (default `agentstudyassistant`) -- `LLM_TIMEOUT` (default `180`) +- `LLM_TIMEOUT` (default `300`) - `LLM_LOG` (default `0`) - `LLM_DRY_RUN` (default `0`) - `LLM_USE_RESPONSES` (default `0`, use OpenAI Responses API payload/parse instead of Chat Completions; unrelated to MCP tool use) -- `LLM_CANDIDATE_LIMIT` (default `10`) +- `LLM_CANDIDATE_LIMIT` (default `5`) +- `LLM_RECOMMENDATION_TOP_K` (default `20`) +- `LLM_RECOMMENDATION_MAX_RESULTS` (default `3`) +- `STUDY_AGENT_MCP_TIMEOUT` (default `240`) +- `EMBED_TIMEOUT` (default `120`) + +Recommended timeout ladder for constrained deployments: + +- `ACP_TIMEOUT > LLM_TIMEOUT > STUDY_AGENT_MCP_TIMEOUT` +- Recommended starting point: `ACP_TIMEOUT=360`, `LLM_TIMEOUT=300`, `STUDY_AGENT_MCP_TIMEOUT=240` + +ACP recommendation flows now expose explicit diagnostics and fallback metadata including `llm_status`, `llm_duration_seconds`, `llm_parse_stage`, `llm_schema_valid`, `fallback_reason`, and `fallback_mode`. + +To estimate environment-specific starting values instead of relying only on defaults, run `doit calibrate_timeouts`. It writes a recommended timeout env fragment and a JSON timing summary based on repeated ACP smoke-flow samples. See `docs/TESTING.md` for CLI smoke tests. diff --git a/acp_agent/study_agent_acp/agent.py b/acp_agent/study_agent_acp/agent.py index 69113e7..e219db3 100644 --- a/acp_agent/study_agent_acp/agent.py +++ b/acp_agent/study_agent_acp/agent.py @@ -18,6 +18,7 @@ propose_concept_set_diff, ) from .llm_client import ( + LLMCallResult, build_intent_split_prompt, build_advice_prompt, build_improvements_prompt, @@ -25,6 +26,8 @@ build_lint_prompt, build_prompt, call_llm, + coerce_llm_call_result, + llm_result_payload, ) @@ -65,6 +68,56 @@ def __init__( "phenotype_intent_split": PhenotypeIntentSplitInput.model_json_schema(), } + def _debug_enabled(self) -> bool: + return os.getenv("STUDY_AGENT_DEBUG", "0") == "1" + + def _log_debug(self, message: str) -> None: + if self._debug_enabled(): + print(f"ACP DEBUG > {message}") + + def _llm_diagnostics(self, result: Optional[LLMCallResult]) -> Dict[str, Any]: + if result is None: + return { + "llm_status": "disabled", + "llm_duration_seconds": 0.0, + "llm_error": "llm_result_missing", + "llm_parse_stage": None, + "llm_schema_valid": False, + } + diagnostics = { + "llm_status": result.status, + "llm_duration_seconds": result.duration_seconds, + "llm_error": result.error, + "llm_parse_stage": result.parse_stage, + "llm_schema_valid": bool(result.schema_valid) if result.schema_valid is not None else result.status == "ok", + "llm_request_mode": result.request_mode, + } + if result.missing_keys: + diagnostics["llm_missing_keys"] = result.missing_keys + if os.getenv("LLM_LOG_RESPONSE", "0") == "1": + diagnostics["llm_raw_response"] = result.raw_response + diagnostics["llm_content_text"] = result.content_text + return diagnostics + + def _fallback_reason_for_llm(self, result: Optional[LLMCallResult]) -> str: + if result is None: + return "llm_empty_result" + mapping = { + "timeout": "llm_timeout", + "http_error": "llm_http_error", + "transport_error": "llm_transport_error", + "json_parse_failed": "llm_json_parse_failed", + "schema_mismatch": "llm_schema_mismatch", + "disabled": "llm_disabled", + } + return mapping.get(result.status, "llm_empty_result") + + def _call_llm(self, prompt: str, required_keys: Optional[List[str]] = None) -> LLMCallResult: + try: + return coerce_llm_call_result(call_llm(prompt, required_keys=required_keys)) + except TypeError: + return coerce_llm_call_result(call_llm(prompt)) + def list_tools(self) -> List[Dict[str, Any]]: if self._mcp_client is not None: return self._mcp_client.list_tools() @@ -126,8 +179,8 @@ def call_tool(self, name: str, arguments: Dict[str, Any], confirm: bool = False) def run_phenotype_recommendation_flow( self, study_intent: str, - top_k: int = 20, - max_results: int = 10, + top_k: Optional[int] = None, + max_results: Optional[int] = None, candidate_limit: Optional[int] = None, candidate_offset: Optional[int] = None, ) -> Dict[str, Any]: @@ -135,15 +188,21 @@ def run_phenotype_recommendation_flow( return {"status": "error", "error": "missing study_intent"} if self._mcp_client is None: return {"status": "error", "error": "MCP client unavailable"} + if top_k is None: + top_k = int(os.getenv("LLM_RECOMMENDATION_TOP_K", "20")) + if max_results is None: + max_results = int(os.getenv("LLM_RECOMMENDATION_MAX_RESULTS", "3")) search_args = {"query": study_intent, "top_k": top_k} if candidate_offset is not None: search_args["offset"] = int(candidate_offset) + self._log_debug(f"phenotype_recommendation: phenotype_search start top_k={top_k} offset={candidate_offset or 0}") search_result = self.call_tool( name="phenotype_search", arguments=search_args, ) + self._log_debug(f"phenotype_recommendation: phenotype_search end status={search_result.get('status')}") if search_result.get("status") != "ok": return { "status": "error", @@ -170,16 +229,24 @@ def run_phenotype_recommendation_flow( "error": "phenotype_search_failed", "details": full, } - candidates = full.get("results") or [] + all_candidates = full.get("results") or [] if candidate_limit is None: - candidate_limit = int(os.getenv("LLM_CANDIDATE_LIMIT", "10")) + candidate_limit = int(os.getenv("LLM_CANDIDATE_LIMIT", "5")) + pre_truncation_count = len(all_candidates) + candidates = all_candidates if candidate_limit > 0: candidates = candidates[:candidate_limit] + self._log_debug( + "phenotype_recommendation: candidate counts " + f"before={pre_truncation_count} after={len(candidates)} limit={candidate_limit}" + ) + self._log_debug("phenotype_recommendation: prompt bundle fetch start") prompt_bundle = self.call_tool( name="phenotype_prompt_bundle", arguments={"task": "phenotype_recommendations"}, ) + self._log_debug(f"phenotype_recommendation: prompt bundle fetch end status={prompt_bundle.get('status')}") prompt_full = prompt_bundle.get("full_result") or {} if prompt_bundle.get("status") != "ok" or prompt_full.get("error"): return { @@ -196,7 +263,14 @@ def run_phenotype_recommendation_flow( candidates=candidates, max_results=max_results, ) - llm_result = call_llm(prompt) + self._log_debug( + f"phenotype_recommendation: llm start prompt_chars={len(prompt)} candidate_count={len(candidates)}" + ) + llm_result = self._call_llm(prompt, required_keys=["plan", "phenotype_recommendations"]) + self._log_debug( + "phenotype_recommendation: llm end " + f"status={llm_result.status} seconds={llm_result.duration_seconds:.2f} parse_stage={llm_result.parse_stage}" + ) catalog_rows = [] for row in candidates: if not isinstance(row, dict): @@ -208,22 +282,34 @@ def run_phenotype_recommendation_flow( "short_description": row.get("short_description"), } ) + llm_payload = llm_result_payload(llm_result) core_result = phenotype_recommendations( protocol_text=study_intent, catalog_rows=catalog_rows, max_results=max_results, - llm_result=llm_result, + llm_result=llm_payload, ) + llm_used = llm_payload is not None + fallback_reason = None if llm_used else self._fallback_reason_for_llm(llm_result) + fallback_mode = None if llm_used else core_result.get("mode") + if fallback_reason: + self._log_debug(f"phenotype_recommendation: fallback chosen reason={fallback_reason} mode={fallback_mode}") return { "status": "ok", "search": full, - "llm_used": llm_result is not None, + "llm_used": llm_used, + "llm_status": llm_result.status, + "fallback_reason": fallback_reason, + "fallback_mode": fallback_mode, "candidate_limit": candidate_limit, "candidate_offset": candidate_offset or 0, "candidate_count": len(candidates), + "candidate_count_before_truncation": pre_truncation_count, + "prompt_length_chars": len(prompt), "recommendations": core_result, + "diagnostics": self._llm_diagnostics(llm_result), } def run_phenotype_recommendation_advice_flow( @@ -253,16 +339,21 @@ def run_phenotype_recommendation_advice_flow( output_schema=prompt_full.get("output_schema", {}), study_intent=study_intent, ) - llm_result = call_llm(prompt) + llm_result = self._call_llm(prompt, required_keys=["advice"]) + llm_payload = llm_result_payload(llm_result) core_result = phenotype_recommendation_advice( study_intent=study_intent, - llm_result=llm_result, + llm_result=llm_payload, ) return { "status": "ok", - "llm_used": llm_result is not None, + "llm_used": llm_payload is not None, + "llm_status": llm_result.status, + "fallback_reason": None if llm_payload is not None else self._fallback_reason_for_llm(llm_result), + "fallback_mode": None if llm_payload is not None else core_result.get("mode"), "advice": core_result, + "diagnostics": self._llm_diagnostics(llm_result), } def run_phenotype_intent_split_flow( @@ -273,8 +364,6 @@ def run_phenotype_intent_split_flow( return {"status": "error", "error": "missing study_intent"} if self._mcp_client is None: return {"status": "error", "error": "MCP client unavailable"} - debug = os.getenv("STUDY_AGENT_DEBUG", "0") == "1" - prompt_bundle = self.call_tool( name="phenotype_intent_split", arguments={}, @@ -293,25 +382,30 @@ def run_phenotype_intent_split_flow( output_schema=prompt_full.get("output_schema", {}), study_intent=study_intent, ) - if debug: - print("ACP DEBUG > phenotype_intent_split: calling LLM") - llm_result = call_llm(prompt) - if debug: - print("ACP DEBUG > phenotype_intent_split: LLM returned") - if llm_result is None: + self._log_debug("phenotype_intent_split: calling LLM") + llm_result = self._call_llm(prompt, required_keys=["target_statement", "outcome_statement", "rationale"]) + self._log_debug( + "phenotype_intent_split: LLM returned " + f"status={llm_result.status} parse_stage={llm_result.parse_stage}" + ) + llm_payload = llm_result_payload(llm_result) + if llm_payload is None: return { "status": "error", "error": "llm_unavailable", + "diagnostics": self._llm_diagnostics(llm_result), } core_result = phenotype_intent_split( study_intent=study_intent, - llm_result=llm_result, + llm_result=llm_payload, ) return { "status": "ok", - "llm_used": llm_result is not None, + "llm_used": True, + "llm_status": llm_result.status, "intent_split": core_result, + "diagnostics": self._llm_diagnostics(llm_result), } def run_phenotype_improvements_flow( @@ -343,7 +437,8 @@ def run_phenotype_improvements_flow( study_intent=protocol_text, cohorts=cohorts, ) - llm_result = call_llm(prompt) + llm_result = coerce_llm_call_result(call_llm(prompt)) + llm_payload = llm_result_payload(llm_result) result = self.call_tool( name="phenotype_improvements", @@ -351,11 +446,13 @@ def run_phenotype_improvements_flow( "protocol_text": protocol_text, "cohorts": cohorts, "characterization_previews": characterization_previews or [], - "llm_result": llm_result, + "llm_result": llm_payload, }, ) if isinstance(result, dict): - result.setdefault("llm_used", llm_result is not None) + result.setdefault("llm_used", llm_payload is not None) + result.setdefault("llm_status", llm_result.status) + result.setdefault("diagnostics", self._llm_diagnostics(llm_result)) result.setdefault("cohort_count", len(cohorts)) return result @@ -385,17 +482,20 @@ def run_concept_sets_review_flow( payload={"concept_set": concept_set, "study_intent": study_intent}, max_kb=15, ) - llm_result = call_llm(prompt) + llm_result = coerce_llm_call_result(call_llm(prompt)) + llm_payload = llm_result_payload(llm_result) result = self.call_tool( name="propose_concept_set_diff", arguments={ "concept_set": concept_set, "study_intent": study_intent, - "llm_result": llm_result, + "llm_result": llm_payload, }, ) if isinstance(result, dict): - result.setdefault("llm_used", llm_result is not None) + result.setdefault("llm_used", llm_payload is not None) + result.setdefault("llm_status", llm_result.status) + result.setdefault("diagnostics", self._llm_diagnostics(llm_result)) return result def run_cohort_critique_general_design_flow( @@ -423,16 +523,19 @@ def run_cohort_critique_general_design_flow( payload={"cohort": cohort}, max_kb=15, ) - llm_result = call_llm(prompt) + llm_result = coerce_llm_call_result(call_llm(prompt)) + llm_payload = llm_result_payload(llm_result) result = self.call_tool( name="cohort_lint", arguments={ "cohort": cohort, - "llm_result": llm_result, + "llm_result": llm_payload, }, ) if isinstance(result, dict): - result.setdefault("llm_used", llm_result is not None) + result.setdefault("llm_used", llm_payload is not None) + result.setdefault("llm_status", llm_result.status) + result.setdefault("diagnostics", self._llm_diagnostics(llm_result)) return result def run_phenotype_validation_review_flow( @@ -491,14 +594,17 @@ def run_phenotype_validation_review_flow( system_prompt=system_prompt, main_prompt=main_prompt, ) - llm_result = call_llm(prompt) + llm_result = coerce_llm_call_result(call_llm(prompt)) + llm_payload = llm_result_payload(llm_result) parsed = self.call_tool( name="keeper_parse_response", - arguments={"llm_output": llm_result}, + arguments={"llm_output": llm_payload}, ) if isinstance(parsed, dict): - parsed.setdefault("llm_used", llm_result is not None) + parsed.setdefault("llm_used", llm_payload is not None) + parsed.setdefault("llm_status", llm_result.status) + parsed.setdefault("diagnostics", self._llm_diagnostics(llm_result)) return parsed def _wrap_result(self, name: str, result: Dict[str, Any], warnings: List[str]) -> Dict[str, Any]: diff --git a/acp_agent/study_agent_acp/llm_client.py b/acp_agent/study_agent_acp/llm_client.py index b8695b4..338073c 100644 --- a/acp_agent/study_agent_acp/llm_client.py +++ b/acp_agent/study_agent_acp/llm_client.py @@ -2,13 +2,41 @@ import json import os +import re +import socket import time import urllib.error import urllib.request -from typing import Any, Dict, Optional +from dataclasses import asdict, dataclass, field +from typing import Any, Dict, Optional, Sequence from study_agent_core.net import rewrite_container_host_url +_JSON_FENCE_RE = re.compile(r"^\s*```(?:json)?\s*|\s*```\s*$", re.IGNORECASE | re.MULTILINE) +_REASONING_PREFIX_RE = re.compile(r"^\s*(?:<[^>]+>\s*)+", re.DOTALL) + + +@dataclass +class LLMCallResult: + status: str + raw_response: Optional[str] = None + parsed_content: Optional[Dict[str, Any]] = None + content_text: Optional[str] = None + http_status: Optional[int] = None + duration_seconds: float = 0.0 + error: Optional[str] = None + parse_stage: Optional[str] = None + request_mode: str = "chat_completions" + schema_valid: Optional[bool] = None + missing_keys: list[str] = field(default_factory=list) + + def to_dict(self, include_raw: bool = False) -> Dict[str, Any]: + payload = asdict(self) + if not include_raw: + payload.pop("raw_response", None) + return payload + + def build_prompt( overview: str, spec: str, @@ -176,6 +204,7 @@ def build_intent_split_prompt( ] return "\n\n".join([s for s in sections if s]) + def build_keeper_prompt( overview: str, spec: str, @@ -205,46 +234,178 @@ def build_keeper_prompt( return "\n\n".join([s for s in sections if s]) -def _extract_json_object(text: str) -> Optional[Dict[str, Any]]: +def _normalize_content_text(text: Optional[str]) -> str: + normalized = str(text or "").strip() + if not normalized: + return "" + normalized = _JSON_FENCE_RE.sub("", normalized).strip() + normalized = _REASONING_PREFIX_RE.sub("", normalized).strip() + first_json = normalized.find("{") + if first_json > 0: + normalized = normalized[first_json:] + return normalized + + +def _extract_json_object(text: str) -> Optional[str]: if not text: return None start = text.find("{") - end = text.rfind("}") - if start == -1 or end == -1 or end <= start: + if start == -1: return None + depth = 0 + in_string = False + escape = False + for idx in range(start, len(text)): + ch = text[idx] + if in_string: + if escape: + escape = False + elif ch == "\\": + escape = True + elif ch == '"': + in_string = False + continue + if ch == '"': + in_string = True + elif ch == "{": + depth += 1 + elif ch == "}": + depth -= 1 + if depth == 0: + return text[start : idx + 1] + return None + + +def _parse_json_content(text: Optional[str]) -> tuple[Optional[Dict[str, Any]], Optional[str], Optional[str]]: + normalized = _normalize_content_text(text) + if not normalized: + return None, normalized, "content_missing" + candidate = _extract_json_object(normalized) + if candidate is None: + return None, normalized, "json_brace_extract" try: - return json.loads(text[start : end + 1]) + parsed = json.loads(candidate) except json.JSONDecodeError: + return None, normalized, "json_loads" + if not isinstance(parsed, dict): + return None, normalized, "json_not_object" + return parsed, normalized, None + + +def _is_timeout_error(exc: BaseException) -> bool: + if isinstance(exc, socket.timeout): + return True + if isinstance(exc, TimeoutError): + return True + if isinstance(exc, urllib.error.URLError): + return _is_timeout_error(exc.reason) if exc.reason else False + return "timed out" in str(exc).lower() + + +def _extract_responses_output_text(data: Dict[str, Any]) -> Optional[str]: + output = data.get("output") or [] + chunks: list[str] = [] + if isinstance(output, list): + for item in output: + if not isinstance(item, dict): + continue + if item.get("type") == "output_text" and item.get("text"): + chunks.append(str(item["text"])) + continue + content = item.get("content") + if isinstance(content, list): + for part in content: + if isinstance(part, dict) and part.get("text"): + chunks.append(str(part["text"])) + if chunks: + return "\n".join(chunks) + text = data.get("text") + if isinstance(text, str): + return text + return None + + +def _log_llm(message: str) -> None: + print(f"LLM {message}") + + +def llm_result_payload(result: Optional[LLMCallResult]) -> Optional[Dict[str, Any]]: + if result is None or result.status != "ok": return None + return result.parsed_content + +def coerce_llm_call_result(value: Any) -> LLMCallResult: + if isinstance(value, LLMCallResult): + return value + if isinstance(value, dict): + return LLMCallResult( + status="ok", + parsed_content=value, + content_text=json.dumps(value, ensure_ascii=True), + parse_stage="compat_dict", + request_mode="chat_completions", + schema_valid=True, + ) + if value is None: + return LLMCallResult( + status="disabled", + error="empty_result", + parse_stage="compat_none", + request_mode="chat_completions", + ) + return LLMCallResult( + status="transport_error", + error=f"unsupported_llm_result_type:{type(value).__name__}", + parse_stage="compat_invalid", + request_mode="chat_completions", + ) + + +def call_llm_for_schema(prompt: str, required_keys: Sequence[str]) -> LLMCallResult: + return call_llm(prompt=prompt, required_keys=required_keys) -def call_llm(prompt: str) -> Optional[Dict[str, Any]]: + +def call_llm(prompt: str, required_keys: Optional[Sequence[str]] = None) -> LLMCallResult: api_url = os.getenv("LLM_API_URL", "http://localhost:3000/api/chat/completions") api_url = rewrite_container_host_url(api_url) api_key = os.getenv("LLM_API_KEY") model = os.getenv("LLM_MODEL", "agentstudyassistant") - timeout = int(os.getenv("LLM_TIMEOUT", "180")) + timeout = int(os.getenv("LLM_TIMEOUT", "300")) log_enabled = os.getenv("LLM_LOG", "0") == "1" log_prompt = os.getenv("LLM_LOG_PROMPT", "0") == "1" log_response = os.getenv("LLM_LOG_RESPONSE", "0") == "1" log_json = os.getenv("LLM_LOG_JSON", "0") == "1" dry_run = os.getenv("LLM_DRY_RUN", "0") == "1" use_responses = os.getenv("LLM_USE_RESPONSES", "0") == "1" + request_mode = "responses" if use_responses else "chat_completions" if log_enabled: - print(f"LLM CONFIG > url={api_url} model={model} timeout={timeout} responses={use_responses}") + _log_llm( + f'CONFIG > url={api_url} model={model} timeout={timeout} request_mode={request_mode} prompt_chars={len(prompt)}' + ) if dry_run: if log_enabled or log_prompt: - print("LLM DRY RUN > skipping API call") - print("LLM OUTGOING PROMPT >", prompt) - return None + _log_llm("DRY RUN > skipping API call") + _log_llm(f"OUTGOING PROMPT > {prompt}") + return LLMCallResult( + status="disabled", + error="dry_run_enabled", + parse_stage="request_skipped", + request_mode=request_mode, + ) if not api_key: if log_enabled: - print("LLM ERROR > missing LLM_API_KEY") - return None + _log_llm("ERROR > missing LLM_API_KEY") + return LLMCallResult( + status="disabled", + error="missing_api_key", + parse_stage="request_skipped", + request_mode=request_mode, + ) if use_responses: payload = { @@ -265,55 +426,125 @@ def call_llm(prompt: str) -> Optional[Dict[str, Any]]: request.add_header("Authorization", f"Bearer {api_key}") if log_enabled or log_prompt: - print("LLM OUTGOING PROMPT >", prompt) + _log_llm(f"OUTGOING PROMPT > {prompt}") + start = time.time() + http_status: Optional[int] = None try: - start = time.time() with urllib.request.urlopen(request, timeout=timeout) as response: raw = response.read().decode("utf-8") - if log_enabled: - print(f"LLM TIMING > seconds={time.time() - start:.2f}") + http_status = getattr(response, "status", None) except urllib.error.HTTPError as exc: - raw = exc.read().decode("utf-8") + raw = exc.read().decode("utf-8", errors="replace") + duration = time.time() - start if log_enabled or log_response: - print(f"LLM HTTP ERROR > {exc.code}") - print("LLM ERROR BODY >", raw) - return None + _log_llm(f"HTTP ERROR > status={exc.code} seconds={duration:.2f}") + _log_llm(f"ERROR BODY > {raw}") + return LLMCallResult( + status="http_error", + raw_response=raw, + http_status=exc.code, + duration_seconds=duration, + error=f"http_{exc.code}", + parse_stage="http_response", + request_mode=request_mode, + ) except urllib.error.URLError as exc: + duration = time.time() - start + status = "timeout" if _is_timeout_error(exc) else "transport_error" if log_enabled: - print(f"LLM ERROR > {exc}") - return None + _log_llm(f"ERROR > status={status} seconds={duration:.2f} detail={exc}") + return LLMCallResult( + status=status, + duration_seconds=duration, + error=str(exc), + parse_stage="transport", + request_mode=request_mode, + ) + except TimeoutError as exc: + duration = time.time() - start + if log_enabled: + _log_llm(f"ERROR > status=timeout seconds={duration:.2f} detail={exc}") + return LLMCallResult( + status="timeout", + duration_seconds=duration, + error=str(exc), + parse_stage="transport", + request_mode=request_mode, + ) + duration = time.time() - start + if log_enabled: + _log_llm(f"TIMING > seconds={duration:.2f}") if log_enabled or log_response: - print("LLM RAW RESPONSE >", raw) + _log_llm(f"RAW RESPONSE > {raw}") try: data = json.loads(raw) except json.JSONDecodeError: - return _extract_json_object(raw) - if log_json: - print("LLM JSON >", data) + data = None + if log_json and data is not None: + _log_llm(f"JSON > {data}") + content_text: Optional[str] = None + parse_stage = "envelope" if use_responses: - output_text = None if isinstance(data, dict): - output = data.get("output") or [] - if output and isinstance(output, list): - for item in output: - if isinstance(item, dict) and item.get("type") == "output_text": - output_text = item.get("text") - break - return _extract_json_object(output_text or raw) - - content = None - if isinstance(data, dict): - choices = data.get("choices") or [] - if choices and isinstance(choices[0], dict): - msg = choices[0].get("message") - if isinstance(msg, dict): - content = msg.get("content") - if content is None: - content = choices[0].get("text") - if content is None: - content = raw - return _extract_json_object(content) + content_text = _extract_responses_output_text(data) + parse_stage = "responses_output" + else: + content_text = raw + parse_stage = "responses_raw" + else: + if isinstance(data, dict): + choices = data.get("choices") or [] + if choices and isinstance(choices[0], dict): + msg = choices[0].get("message") + if isinstance(msg, dict): + content_text = msg.get("content") + if isinstance(content_text, list): + chunks = [] + for part in content_text: + if isinstance(part, dict) and part.get("text"): + chunks.append(str(part["text"])) + content_text = "\n".join(chunks) if chunks else None + if content_text is None: + content_text = choices[0].get("text") + parse_stage = "chat_completions_content" + else: + content_text = raw + parse_stage = "chat_completions_raw" + + parse_source = content_text + if parse_source is None and data is None: + parse_source = raw + parsed, normalized_content, parse_error_stage = _parse_json_content(parse_source) + result = LLMCallResult( + status="ok" if parsed is not None else "json_parse_failed", + raw_response=raw, + parsed_content=parsed, + content_text=normalized_content or content_text, + http_status=http_status, + duration_seconds=duration, + error=None if parsed is not None else "json_parse_failed", + parse_stage=parse_stage if parse_error_stage is None else f"{parse_stage}:{parse_error_stage}", + request_mode=request_mode, + ) + + if parsed is None: + if log_enabled: + _log_llm(f"PARSE RESULT > status={result.status} parse_stage={result.parse_stage}") + return result + + missing_keys = [key for key in (required_keys or []) if key not in parsed] + result.schema_valid = len(missing_keys) == 0 + result.missing_keys = missing_keys + if missing_keys: + result.status = "schema_mismatch" + result.error = f"missing_required_keys:{','.join(missing_keys)}" + result.parse_stage = f"{result.parse_stage}:schema" + if log_enabled: + _log_llm( + f"PARSE RESULT > status={result.status} parse_stage={result.parse_stage} schema_valid={result.schema_valid}" + ) + return result diff --git a/acp_agent/study_agent_acp/server.py b/acp_agent/study_agent_acp/server.py index ebb8fe3..fc4d1f8 100644 --- a/acp_agent/study_agent_acp/server.py +++ b/acp_agent/study_agent_acp/server.py @@ -21,6 +21,44 @@ SERVICE_REGISTRY_PATH = os.getenv("STUDY_AGENT_SERVICE_REGISTRY", "docs/SERVICE_REGISTRY.yaml") +def _sanitize_config_value(name: str, value: Optional[str]) -> Optional[str]: + if value is None: + return None + upper = name.upper() + if "KEY" in upper or "TOKEN" in upper or "SECRET" in upper: + return "***" + return value + + +def _log_startup_config() -> None: + config_names = [ + "LLM_API_URL", + "LLM_MODEL", + "LLM_USE_RESPONSES", + "LLM_TIMEOUT", + "STUDY_AGENT_MCP_TIMEOUT", + "LLM_CANDIDATE_LIMIT", + "LLM_RECOMMENDATION_MAX_RESULTS", + "LLM_RECOMMENDATION_TOP_K", + "EMBED_TIMEOUT", + "ACP_TIMEOUT", + ] + items = [] + for name in config_names: + items.append(f"{name}={_sanitize_config_value(name, os.getenv(name))}") + print("ACP CONFIG > " + " ".join(items)) + + +def _warn_on_inconsistent_llm_config() -> None: + api_url = os.getenv("LLM_API_URL", "") + use_responses = os.getenv("LLM_USE_RESPONSES", "0") + if "/api/chat/completions" in api_url and use_responses == "1": + print( + "ACP WARN > LLM_API_URL targets /api/chat/completions while LLM_USE_RESPONSES=1. " + "Set LLM_USE_RESPONSES=0 for chat-completions compatibility." + ) + + def _read_json(handler: BaseHTTPRequestHandler) -> Dict[str, Any]: length = int(handler.headers.get("Content-Length", "0")) if length <= 0: @@ -499,7 +537,9 @@ def main(host: str = "127.0.0.1", port: int = 8765) -> None: mcp_cwd = os.getenv("STUDY_AGENT_MCP_CWD") or os.getcwd() mcp_url = os.getenv("STUDY_AGENT_MCP_URL") mcp_token = os.getenv("STUDY_AGENT_MCP_TOKEN") - mcp_timeout = int(os.getenv("STUDY_AGENT_MCP_TIMEOUT", "30")) + mcp_timeout = int(os.getenv("STUDY_AGENT_MCP_TIMEOUT", "240")) + _log_startup_config() + _warn_on_inconsistent_llm_config() if mcp_url: if "://" in mcp_url and ":" not in mcp_url.split("://", 1)[1]: @@ -565,7 +605,9 @@ def _serve(server: HTTPServer, mcp_client: Optional[object]) -> None: server.serve_forever() finally: try: - server.server_close() + close = getattr(server, "server_close", None) + if callable(close): + close() finally: if mcp_client is not None: try: diff --git a/acp_agent/study_agent_acp/timeout_calibration.py b/acp_agent/study_agent_acp/timeout_calibration.py new file mode 100644 index 0000000..bc358f5 --- /dev/null +++ b/acp_agent/study_agent_acp/timeout_calibration.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import math +import re +from typing import Any, Dict, Iterable, List, Optional + +_EMBED_SECONDS_RE = re.compile(r"EMBED DEBUG > .* seconds=(?P\d+(?:\.\d+)?)") + + +def percentile(values: Iterable[float], p: float) -> float: + ordered = sorted(float(v) for v in values if v is not None) + if not ordered: + return 0.0 + if len(ordered) == 1: + return ordered[0] + p = max(0.0, min(100.0, float(p))) + rank = (len(ordered) - 1) * (p / 100.0) + lower = math.floor(rank) + upper = math.ceil(rank) + if lower == upper: + return ordered[lower] + weight = rank - lower + return ordered[lower] * (1.0 - weight) + ordered[upper] * weight + + +def recommend_timeout( + observed_seconds: Iterable[float], + minimum: int, + p95_multiplier: float = 1.5, + max_multiplier: float = 1.25, + pad_seconds: int = 0, +) -> int: + observed = [float(v) for v in observed_seconds if float(v) > 0] + if not observed: + return int(minimum) + p95 = percentile(observed, 95) + peak = max(observed) + recommended = max(minimum, math.ceil(p95 * p95_multiplier), math.ceil(peak * max_multiplier)) + return int(recommended + max(0, int(pad_seconds))) + + +def parse_embed_debug_seconds(log_text: str) -> List[float]: + values: List[float] = [] + for match in _EMBED_SECONDS_RE.finditer(log_text or ""): + values.append(float(match.group("seconds"))) + return values + + +def calibrate_timeout_recommendations( + runs: List[Dict[str, Any]], + embed_seconds: Optional[List[float]] = None, +) -> Dict[str, Any]: + embed_seconds = embed_seconds or [] + llm_durations = [ + float((run.get("diagnostics") or {}).get("llm_duration_seconds") or 0.0) + for run in runs + if isinstance(run, dict) + ] + wall_durations = [float(run.get("wall_seconds") or 0.0) for run in runs if isinstance(run, dict)] + non_llm_durations = [] + for run in runs: + if not isinstance(run, dict): + continue + diagnostics = run.get("diagnostics") or {} + wall = float(run.get("wall_seconds") or 0.0) + llm = float(diagnostics.get("llm_duration_seconds") or 0.0) + non_llm_durations.append(max(0.0, wall - llm)) + mcp_observed = sorted([v for v in non_llm_durations + embed_seconds if v > 0]) + + embed_timeout = max( + 60, + recommend_timeout(embed_seconds, minimum=120, p95_multiplier=1.5, max_multiplier=1.25, pad_seconds=5), + ) + llm_timeout = recommend_timeout(llm_durations, minimum=120, p95_multiplier=1.5, max_multiplier=1.25, pad_seconds=10) + mcp_timeout = max( + embed_timeout, + recommend_timeout(mcp_observed, minimum=60, p95_multiplier=1.5, max_multiplier=1.25, pad_seconds=10), + ) + acp_floor = llm_timeout + max(30, math.ceil(mcp_timeout * 0.25)) + acp_timeout = max( + acp_floor, + recommend_timeout(wall_durations, minimum=180, p95_multiplier=1.35, max_multiplier=1.2, pad_seconds=15), + ) + if acp_timeout <= llm_timeout: + acp_timeout = llm_timeout + 30 + + llm_failures = [] + for run in runs: + diagnostics = (run or {}).get("diagnostics") or {} + status = diagnostics.get("llm_status") + if status and status != "ok": + llm_failures.append( + { + "flow": run.get("flow"), + "llm_status": status, + "fallback_reason": run.get("fallback_reason"), + } + ) + + return { + "observed": { + "runs": len(runs), + "wall_seconds_p95": round(percentile(wall_durations, 95), 3), + "llm_seconds_p95": round(percentile(llm_durations, 95), 3), + "mcp_proxy_seconds_p95": round(percentile(mcp_observed, 95), 3), + "embed_seconds_p95": round(percentile(embed_seconds, 95), 3), + }, + "recommended_env": { + "EMBED_TIMEOUT": embed_timeout, + "STUDY_AGENT_MCP_TIMEOUT": mcp_timeout, + "LLM_TIMEOUT": llm_timeout, + "ACP_TIMEOUT": acp_timeout, + }, + "llm_failures": llm_failures, + } + + +def render_env_fragment(calibration: Dict[str, Any]) -> str: + env = calibration.get("recommended_env") or {} + lines = ["# Generated by scripts/calibrate_timeouts.py"] + for key in ("EMBED_TIMEOUT", "STUDY_AGENT_MCP_TIMEOUT", "LLM_TIMEOUT", "ACP_TIMEOUT"): + if key in env: + lines.append(f"{key}={env[key]}") + return "\n".join(lines) + "\n" diff --git a/docs/TESTING.md b/docs/TESTING.md index dd0db91..9cdaca5 100644 --- a/docs/TESTING.md +++ b/docs/TESTING.md @@ -197,6 +197,20 @@ Health check (PowerShell): Invoke-RestMethod -Uri http://127.0.0.1:8765/health ``` +Windows logging (redirect stdout/stderr to files): + +```powershell +study-agent-mcp 1> mcp.out.log 2> mcp.err.log +study-agent-acp 1> acp.out.log 2> acp.err.log +``` + +Or using `Start-Process`: + +```powershell +Start-Process study-agent-mcp -RedirectStandardOutput mcp.out.log -RedirectStandardError mcp.err.log +Start-Process study-agent-acp -RedirectStandardOutput acp.out.log -RedirectStandardError acp.err.log +``` + Recommended MCP environment (use absolute paths for stability): ```bash @@ -230,10 +244,17 @@ export LLM_MODEL="gemma3:4b" export LLM_DRY_RUN=0 export LLM_USE_RESPONSES=0 export LLM_LOG=1 +export LLM_TIMEOUT=300 +export STUDY_AGENT_MCP_TIMEOUT=240 +export ACP_TIMEOUT=360 +export EMBED_TIMEOUT=120 +export LLM_CANDIDATE_LIMIT=5 +export LLM_RECOMMENDATION_MAX_RESULTS=3 ``` `LLM_LOG=1` enables verbose LLM logging to ACP stdout (config, prompt, raw response). For OpenWebUI using `/api/chat/completions`, keep `LLM_USE_RESPONSES=0` (the Responses API schema is not supported and can yield empty outputs). +Recommended timeout ladder: `ACP_TIMEOUT > LLM_TIMEOUT > STUDY_AGENT_MCP_TIMEOUT`. Then call: @@ -243,6 +264,44 @@ curl -s -X POST http://127.0.0.1:8765/flows/phenotype_recommendation \ -d '{"study_intent":"Identify clinical risk factors for older adult patients who experience an adverse event of acute gastro-intenstinal (GI) bleeding", "top_k":20, "max_results":10,"candidate_limit":10}' ``` +Expected recommendation responses now include `llm_used`, `llm_status`, `fallback_reason`, `fallback_mode`, and `diagnostics`. If the LLM path fails to parse or validate, ACP still returns `status: ok` with an explicit machine-readable fallback reason instead of silently degrading. + +## Timeout calibration + +Use the automated calibration task to derive environment-specific starting values for `EMBED_TIMEOUT`, `STUDY_AGENT_MCP_TIMEOUT`, `LLM_TIMEOUT`, and `ACP_TIMEOUT`: + +```bash +doit calibrate_timeouts +``` + +What it does: + +- starts MCP and ACP if they are not already running +- warms up and samples `phenotype_intent_split`, `phenotype_recommendation_advice`, and `phenotype_recommendation` +- tests multiple recommendation prompt sizes using `TIMEOUT_CALIBRATION_CANDIDATE_LIMITS` (default `3,5,8`) +- uses ACP diagnostics plus MCP embedding debug logs to recommend timeouts with safety margins + +Useful overrides: + +```bash +export TIMEOUT_CALIBRATION_RUNS=3 +export TIMEOUT_CALIBRATION_CANDIDATE_LIMITS=3,5,8 +export TIMEOUT_CALIBRATION_ENV_PATH=/tmp/study_agent_timeout_recommendations.env +export TIMEOUT_CALIBRATION_JSON_PATH=/tmp/study_agent_timeout_recommendations.json +doit calibrate_timeouts +``` + +Outputs: + +- `.env` fragment with recommended timeout values +- JSON summary with observed p95 timings, fallback statuses, and per-run details + +Interpretation notes: + +- If the calibration run reports repeated `llm_status != ok`, fix LLM parsing/compatibility first rather than only raising timeouts. +- If larger `candidate_limit` values sharply increase latency, prefer a smaller `LLM_CANDIDATE_LIMIT` before increasing `LLM_TIMEOUT`. +- Treat the generated values as good starting points for that environment, not universal maxima. + Phenotype intent split (target/outcome statements): ```bash diff --git a/dodo.py b/dodo.py index 2f94166..415baab 100644 --- a/dodo.py +++ b/dodo.py @@ -24,8 +24,12 @@ "LLM_LOG_JSON": os.getenv("LLM_LOG_JSON", "0"), "LLM_DRY_RUN": os.getenv("LLM_DRY_RUN", "0"), "LLM_USE_RESPONSES": os.getenv("LLM_USE_RESPONSES", "0"), - "LLM_CANDIDATE_LIMIT": os.getenv("LLM_CANDIDATE_LIMIT", "10"), - "ACP_TIMEOUT": os.getenv("ACP_TIMEOUT", "180"), + "LLM_CANDIDATE_LIMIT": os.getenv("LLM_CANDIDATE_LIMIT", "5"), + "LLM_RECOMMENDATION_MAX_RESULTS": os.getenv("LLM_RECOMMENDATION_MAX_RESULTS", "3"), + "LLM_RECOMMENDATION_TOP_K": os.getenv("LLM_RECOMMENDATION_TOP_K", "20"), + "EMBED_TIMEOUT": os.getenv("EMBED_TIMEOUT", "120"), + "STUDY_AGENT_MCP_TIMEOUT": os.getenv("STUDY_AGENT_MCP_TIMEOUT", "240"), + "ACP_TIMEOUT": os.getenv("ACP_TIMEOUT", "360"), "STUDY_AGENT_HOST": os.getenv("STUDY_AGENT_HOST", "127.0.0.1"), "STUDY_AGENT_PORT": os.getenv("STUDY_AGENT_PORT", "8765"), } @@ -138,6 +142,63 @@ def task_run_all(): } +def task_calibrate_timeouts(): + def _run_calibration() -> None: + env = os.environ.copy() + if not env.get("LLM_API_KEY"): + print("Missing LLM_API_KEY in environment. Set it before running this task.") + return + for key, value in DEFAULT_ENV.items(): + env.setdefault(key, value) + if not env.get("STUDY_AGENT_MCP_URL"): + env.setdefault("STUDY_AGENT_MCP_COMMAND", "study-agent-mcp") + env.setdefault("STUDY_AGENT_MCP_ARGS", "") + env.setdefault("LLM_LOG", "1") + env.setdefault("LLM_LOG_PROMPT", "0") + env.setdefault("LLM_LOG_RESPONSE", "0") + env.setdefault("STUDY_AGENT_DEBUG", "1") + env.setdefault("EMBED_LOG", "1") + env.setdefault("TIMEOUT_CALIBRATION_RUNS", "3") + env.setdefault("TIMEOUT_CALIBRATION_CANDIDATE_LIMITS", "3,5,8") + env.setdefault("TIMEOUT_CALIBRATION_ENV_PATH", "/tmp/study_agent_timeout_recommendations.env") + env.setdefault("TIMEOUT_CALIBRATION_JSON_PATH", "/tmp/study_agent_timeout_recommendations.json") + + acp_stdout = env.get("ACP_STDOUT", "/tmp/study_agent_acp_stdout.log") + acp_stderr = env.get("ACP_STDERR", "/tmp/study_agent_acp_stderr.log") + mcp_proc = _start_mcp_http_if_needed(env) + print("Starting ACP for timeout calibration...") + with open(acp_stdout, "w", encoding="utf-8") as out, open(acp_stderr, "w", encoding="utf-8") as err: + acp_proc = subprocess.Popen(["study-agent-acp"], env=env, stdout=out, stderr=err) + try: + print("Waiting for ACP health endpoint...") + require_mcp = bool(env.get("STUDY_AGENT_MCP_URL") or env.get("STUDY_AGENT_MCP_COMMAND")) + _wait_for_acp("http://127.0.0.1:8765/health", timeout_s=30, require_mcp=require_mcp) + print("Running timeout calibration...") + subprocess.run(["python", "scripts/calibrate_timeouts.py"], check=True, env=env) + print(f"ACP logs: {acp_stdout} {acp_stderr}") + print(f"Recommended env: {env['TIMEOUT_CALIBRATION_ENV_PATH']}") + print(f"Calibration details: {env['TIMEOUT_CALIBRATION_JSON_PATH']}") + finally: + print("Stopping ACP...") + acp_proc.terminate() + try: + acp_proc.wait(timeout=10) + except subprocess.TimeoutExpired: + acp_proc.kill() + if mcp_proc is not None: + print("Stopping MCP...") + mcp_proc.terminate() + try: + mcp_proc.wait(timeout=10) + except subprocess.TimeoutExpired: + mcp_proc.kill() + + return { + "actions": [_run_calibration], + "verbosity": 2, + } + + def task_check_llm_connectivity(): def _run_check() -> None: env = os.environ.copy() diff --git a/mcp_server/study_agent_mcp/retrieval/index.py b/mcp_server/study_agent_mcp/retrieval/index.py index 8d4b5d6..c2fb95b 100644 --- a/mcp_server/study_agent_mcp/retrieval/index.py +++ b/mcp_server/study_agent_mcp/retrieval/index.py @@ -6,6 +6,7 @@ import os import pickle import re +import time import urllib.error import urllib.request from dataclasses import dataclass @@ -59,7 +60,14 @@ class EmbeddingClient: url: str model: str api_key: Optional[str] = None - timeout: int = 30 + timeout: Optional[int] = None + + def __post_init__(self) -> None: + if self.timeout is None: + self.timeout = int(os.getenv("EMBED_TIMEOUT", "120")) + + def _debug_enabled(self) -> bool: + return os.getenv("STUDY_AGENT_DEBUG", "0") == "1" or os.getenv("EMBED_LOG", "0") == "1" def embed_texts(self, texts: List[str]) -> List[List[float]]: payload = json.dumps({"model": self.model, "input": texts}).encode("utf-8") @@ -67,19 +75,33 @@ def embed_texts(self, texts: List[str]) -> List[List[float]]: request.add_header("Content-Type", "application/json") if self.api_key: request.add_header("Authorization", f"Bearer {self.api_key}") + start = time.time() try: with urllib.request.urlopen(request, timeout=self.timeout) as response: raw = response.read().decode("utf-8") except urllib.error.URLError as exc: raise RuntimeError(f"Embedding request failed: {exc}") from exc - data = json.loads(raw) + duration = time.time() - start + if self._debug_enabled(): + print( + f"EMBED DEBUG > url={self.url} model={self.model} timeout={self.timeout} " + f"texts={len(texts)} seconds={duration:.2f}" + ) + try: + data = json.loads(raw) + except json.JSONDecodeError as exc: + raise RuntimeError(f"Embedding response was not valid JSON: {exc}. body={raw[:400]}") from exc if isinstance(data.get("embeddings"), list): return data["embeddings"] if isinstance(data.get("data"), list): return [row.get("embedding") for row in data["data"]] if isinstance(data.get("embedding"), list): return [data["embedding"]] - raise RuntimeError("Embedding response missing embeddings payload.") + if isinstance(data, dict): + keys = ",".join(sorted(data.keys())) + preview = raw[:400] + raise RuntimeError(f"Embedding response missing embeddings payload. keys={keys} body={preview}") + raise RuntimeError(f"Embedding response malformed. body={raw[:400]}") class PhenotypeIndex: diff --git a/scripts/calibrate_timeouts.py b/scripts/calibrate_timeouts.py new file mode 100644 index 0000000..f9e6736 --- /dev/null +++ b/scripts/calibrate_timeouts.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import os +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any, Dict, List + +REPO_ROOT = Path(__file__).resolve().parents[1] +ACP_PACKAGE_ROOT = REPO_ROOT / "acp_agent" +if str(ACP_PACKAGE_ROOT) not in sys.path: + sys.path.insert(0, str(ACP_PACKAGE_ROOT)) + +from study_agent_acp.timeout_calibration import ( # noqa: E402 + calibrate_timeout_recommendations, + parse_embed_debug_seconds, + render_env_fragment, +) + +ACP_BASE_URL = os.getenv("ACP_BASE_URL", "http://127.0.0.1:8765") +ACP_TIMEOUT = int(os.getenv("ACP_TIMEOUT", "360")) +CALIBRATION_RUNS = int(os.getenv("TIMEOUT_CALIBRATION_RUNS", "3")) +LLM_CANDIDATE_LIMITS = [ + int(part.strip()) + for part in os.getenv("TIMEOUT_CALIBRATION_CANDIDATE_LIMITS", "3,5,8").split(",") + if part.strip() +] +OUTPUT_ENV_PATH = os.getenv("TIMEOUT_CALIBRATION_ENV_PATH", "/tmp/study_agent_timeout_recommendations.env") +OUTPUT_JSON_PATH = os.getenv("TIMEOUT_CALIBRATION_JSON_PATH", "/tmp/study_agent_timeout_recommendations.json") +MCP_STDOUT_PATH = os.getenv("MCP_STDOUT", "/tmp/study_agent_mcp_stdout.log") + +STUDY_INTENT = ( + "Study intent: Identify clinical risk factors for older adult patients who experience " + "an adverse event of acute gastro-intenstinal (GI) bleeding. The GI bleed has to be " + "detected in the hospital setting. Risk factors can include concomitant medications " + "or chronic and acute conditions." +) + + +def _post_flow(path: str, payload: Dict[str, Any]) -> Dict[str, Any]: + body = json.dumps(payload).encode("utf-8") + request = urllib.request.Request(f"{ACP_BASE_URL}{path}", data=body, method="POST") + request.add_header("Content-Type", "application/json") + start = time.time() + with urllib.request.urlopen(request, timeout=ACP_TIMEOUT) as response: + raw = response.read().decode("utf-8") + wall_seconds = time.time() - start + data = json.loads(raw) + data["wall_seconds"] = round(wall_seconds, 3) + return data + + +def _run_calibration() -> List[Dict[str, Any]]: + runs: List[Dict[str, Any]] = [] + for _ in range(CALIBRATION_RUNS): + result = _post_flow("/flows/phenotype_intent_split", {"study_intent": STUDY_INTENT}) + runs.append({"flow": "phenotype_intent_split", **result}) + for _ in range(CALIBRATION_RUNS): + result = _post_flow("/flows/phenotype_recommendation_advice", {"study_intent": STUDY_INTENT}) + runs.append({"flow": "phenotype_recommendation_advice", **result}) + for candidate_limit in LLM_CANDIDATE_LIMITS: + for _ in range(CALIBRATION_RUNS): + result = _post_flow( + "/flows/phenotype_recommendation", + { + "study_intent": STUDY_INTENT, + "top_k": max(20, candidate_limit), + "max_results": int(os.getenv("LLM_RECOMMENDATION_MAX_RESULTS", "3")), + "candidate_limit": candidate_limit, + }, + ) + runs.append( + { + "flow": "phenotype_recommendation", + "candidate_limit": candidate_limit, + **result, + } + ) + return runs + + +def main() -> int: + try: + runs = _run_calibration() + except urllib.error.HTTPError as exc: + raw = exc.read().decode("utf-8", errors="replace") + print(raw, file=sys.stderr) + return 1 + except Exception as exc: + print(f"Calibration failed: {exc}", file=sys.stderr) + return 1 + + embed_seconds: List[float] = [] + mcp_stdout = Path(MCP_STDOUT_PATH) + if mcp_stdout.exists(): + embed_seconds = parse_embed_debug_seconds(mcp_stdout.read_text(encoding="utf-8")) + + calibration = calibrate_timeout_recommendations(runs, embed_seconds=embed_seconds) + calibration["runs"] = runs + calibration["candidate_limits_tested"] = LLM_CANDIDATE_LIMITS + calibration["run_count_per_flow"] = CALIBRATION_RUNS + + Path(OUTPUT_JSON_PATH).write_text(json.dumps(calibration, indent=2), encoding="utf-8") + Path(OUTPUT_ENV_PATH).write_text(render_env_fragment(calibration), encoding="utf-8") + + print(json.dumps(calibration, indent=2)) + print(f"Wrote env recommendations to {OUTPUT_ENV_PATH}") + print(f"Wrote calibration details to {OUTPUT_JSON_PATH}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_acp_phenotype_flow.py b/tests/test_acp_phenotype_flow.py index a76211b..85d90dc 100644 --- a/tests/test_acp_phenotype_flow.py +++ b/tests/test_acp_phenotype_flow.py @@ -2,6 +2,7 @@ from study_agent_acp.agent import StudyAgent import study_agent_acp.agent as agent_module +from study_agent_acp.llm_client import LLMCallResult class StubMCPClient: @@ -51,5 +52,38 @@ def fake_llm(prompt): assert result["candidate_limit"] == 1 assert result["candidate_count"] == 1 assert result["llm_used"] is True + assert result["llm_status"] == "ok" + assert result["fallback_reason"] is None + assert result["diagnostics"]["llm_schema_valid"] is True recs = result["recommendations"]["phenotype_recommendations"] assert len(recs) == 1 + + +@pytest.mark.acp +def test_acp_flow_parse_failure_returns_explicit_fallback(monkeypatch): + def fake_llm(prompt, required_keys=None): + return LLMCallResult( + status="json_parse_failed", + error="json_parse_failed", + parse_stage="chat_completions_content:json_loads", + duration_seconds=12.5, + request_mode="chat_completions", + content_text='{"plan": ', + ) + + monkeypatch.setattr(agent_module, "call_llm", fake_llm) + + agent = StudyAgent(mcp_client=StubMCPClient()) + result = agent.run_phenotype_recommendation_flow( + study_intent="test intent", + top_k=5, + max_results=3, + candidate_limit=2, + ) + assert result["status"] == "ok" + assert result["llm_used"] is False + assert result["llm_status"] == "json_parse_failed" + assert result["fallback_reason"] == "llm_json_parse_failed" + assert result["fallback_mode"] == "stub" + assert result["diagnostics"]["llm_parse_stage"] == "chat_completions_content:json_loads" + assert result["recommendations"]["mode"] == "stub" diff --git a/tests/test_acp_server.py b/tests/test_acp_server.py index f09a062..c6eac44 100644 --- a/tests/test_acp_server.py +++ b/tests/test_acp_server.py @@ -3,6 +3,7 @@ from study_agent_acp import server as acp_server from study_agent_acp.mcp_client import StdioMCPClient from study_agent_acp.agent import StudyAgent +from study_agent_acp.llm_client import LLMCallResult @pytest.mark.acp @@ -180,10 +181,36 @@ def fake_llm(prompt): ) assert result["status"] == "ok" assert result["llm_used"] is True + assert result["llm_status"] == "ok" assert result["advice"]["advice"] == "Refine intent" assert "Intent text" in captured.get("prompt", "") +@pytest.mark.acp +def test_flow_phenotype_recommendation_advice_parse_failure(monkeypatch): + import study_agent_acp.agent as agent_module + + def fake_llm(prompt, required_keys=None): + return LLMCallResult( + status="json_parse_failed", + error="json_parse_failed", + parse_stage="chat_completions_content:json_brace_extract", + request_mode="chat_completions", + ) + + monkeypatch.setattr(agent_module, "call_llm", fake_llm) + agent = StudyAgent(mcp_client=StubMCPClient()) + result = agent.run_phenotype_recommendation_advice_flow( + study_intent="Intent text", + ) + assert result["status"] == "ok" + assert result["llm_used"] is False + assert result["llm_status"] == "json_parse_failed" + assert result["fallback_reason"] == "llm_json_parse_failed" + assert result["fallback_mode"] == "stub" + assert result["advice"]["mode"] == "stub" + + @pytest.mark.acp def test_flow_phenotype_recommendation_advice_missing_intent(): agent = StudyAgent(mcp_client=StubMCPClient()) @@ -238,10 +265,37 @@ def fake_llm(prompt): ) assert result["status"] == "ok" assert result["llm_used"] is True + assert result["llm_status"] == "ok" assert result["intent_split"]["target_statement"] == "Target cohort" assert "Intent text" in captured.get("prompt", "") +@pytest.mark.acp +def test_flow_phenotype_intent_split_schema_mismatch(monkeypatch): + import study_agent_acp.agent as agent_module + + def fake_llm(prompt, required_keys=None): + return LLMCallResult( + status="schema_mismatch", + parsed_content={"target_statement": "Target only"}, + parse_stage="chat_completions_content:schema", + error="missing_required_keys:outcome_statement,rationale", + missing_keys=["outcome_statement", "rationale"], + schema_valid=False, + request_mode="chat_completions", + ) + + monkeypatch.setattr(agent_module, "call_llm", fake_llm) + agent = StudyAgent(mcp_client=StubMCPClient()) + result = agent.run_phenotype_intent_split_flow( + study_intent="Intent text", + ) + assert result["status"] == "error" + assert result["error"] == "llm_unavailable" + assert result["diagnostics"]["llm_status"] == "schema_mismatch" + assert result["diagnostics"]["llm_missing_keys"] == ["outcome_statement", "rationale"] + + @pytest.mark.acp def test_flow_phenotype_intent_split_missing_intent(): agent = StudyAgent(mcp_client=StubMCPClient()) diff --git a/tests/test_llm_client.py b/tests/test_llm_client.py new file mode 100644 index 0000000..1343061 --- /dev/null +++ b/tests/test_llm_client.py @@ -0,0 +1,145 @@ +import json + +import pytest + +from study_agent_acp import llm_client + + +class _FakeResponse: + def __init__(self, payload, status=200): + self._payload = payload + self.status = status + + def read(self): + return self._payload.encode("utf-8") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +@pytest.mark.acp +def test_call_llm_chat_completions_success(monkeypatch): + payload = { + "choices": [ + { + "message": { + "content": json.dumps( + { + "plan": "ok", + "phenotype_recommendations": [{"cohortId": 1}], + } + ) + } + } + ] + } + + monkeypatch.setenv("LLM_API_KEY", "secret") + monkeypatch.setenv("LLM_USE_RESPONSES", "0") + monkeypatch.setattr(llm_client.urllib.request, "urlopen", lambda request, timeout=0: _FakeResponse(json.dumps(payload))) + + result = llm_client.call_llm("prompt", required_keys=["plan", "phenotype_recommendations"]) + assert result.status == "ok" + assert result.parsed_content["plan"] == "ok" + assert result.schema_valid is True + assert result.request_mode == "chat_completions" + + +@pytest.mark.acp +def test_call_llm_strips_fenced_json(monkeypatch): + payload = { + "choices": [ + { + "message": { + "content": "```json\n{\"plan\":\"ok\",\"phenotype_recommendations\":[]}\n```" + } + } + ] + } + + monkeypatch.setenv("LLM_API_KEY", "secret") + monkeypatch.setenv("LLM_USE_RESPONSES", "0") + monkeypatch.setattr(llm_client.urllib.request, "urlopen", lambda request, timeout=0: _FakeResponse(json.dumps(payload))) + + result = llm_client.call_llm("prompt", required_keys=["plan", "phenotype_recommendations"]) + assert result.status == "ok" + assert result.parsed_content["phenotype_recommendations"] == [] + + +@pytest.mark.acp +def test_call_llm_strips_reasoning_prefix(monkeypatch): + payload = { + "choices": [ + { + "message": { + "content": "thought\n{\"advice\":\"Refine intent\"}" + } + } + ] + } + + monkeypatch.setenv("LLM_API_KEY", "secret") + monkeypatch.setenv("LLM_USE_RESPONSES", "0") + monkeypatch.setattr(llm_client.urllib.request, "urlopen", lambda request, timeout=0: _FakeResponse(json.dumps(payload))) + + result = llm_client.call_llm("prompt", required_keys=["advice"]) + assert result.status == "ok" + assert result.parsed_content["advice"] == "Refine intent" + + +@pytest.mark.acp +def test_call_llm_malformed_truncated_json(monkeypatch): + payload = { + "choices": [ + { + "message": { + "content": "{\"plan\":\"oops\"" + } + } + ] + } + + monkeypatch.setenv("LLM_API_KEY", "secret") + monkeypatch.setenv("LLM_USE_RESPONSES", "0") + monkeypatch.setattr(llm_client.urllib.request, "urlopen", lambda request, timeout=0: _FakeResponse(json.dumps(payload))) + + result = llm_client.call_llm("prompt", required_keys=["plan"]) + assert result.status == "json_parse_failed" + assert result.parse_stage.endswith("json_brace_extract") + + +@pytest.mark.acp +def test_call_llm_responses_mode_mismatch(monkeypatch): + payload = {"choices": [{"message": {"content": "{\"advice\":\"hi\"}"}}]} + + monkeypatch.setenv("LLM_API_KEY", "secret") + monkeypatch.setenv("LLM_USE_RESPONSES", "1") + monkeypatch.setattr(llm_client.urllib.request, "urlopen", lambda request, timeout=0: _FakeResponse(json.dumps(payload))) + + result = llm_client.call_llm("prompt", required_keys=["advice"]) + assert result.status == "json_parse_failed" + assert result.request_mode == "responses" + + +@pytest.mark.acp +def test_call_llm_missing_required_keys(monkeypatch): + payload = { + "choices": [ + { + "message": { + "content": "{\"advice\":\"Refine intent\"}" + } + } + ] + } + + monkeypatch.setenv("LLM_API_KEY", "secret") + monkeypatch.setenv("LLM_USE_RESPONSES", "0") + monkeypatch.setattr(llm_client.urllib.request, "urlopen", lambda request, timeout=0: _FakeResponse(json.dumps(payload))) + + result = llm_client.call_llm("prompt", required_keys=["advice", "next_steps"]) + assert result.status == "schema_mismatch" + assert result.missing_keys == ["next_steps"] diff --git a/tests/test_timeout_calibration.py b/tests/test_timeout_calibration.py new file mode 100644 index 0000000..312d228 --- /dev/null +++ b/tests/test_timeout_calibration.py @@ -0,0 +1,65 @@ +import pytest + +from study_agent_acp.timeout_calibration import ( + calibrate_timeout_recommendations, + parse_embed_debug_seconds, + percentile, + recommend_timeout, + render_env_fragment, +) + + +@pytest.mark.acp +def test_percentile_interpolates(): + assert percentile([1.0, 2.0, 3.0, 4.0], 50) == pytest.approx(2.5) + + +@pytest.mark.acp +def test_parse_embed_debug_seconds(): + log_text = """ +EMBED DEBUG > url=http://x model=m timeout=120 texts=1 seconds=2.34 +EMBED DEBUG > url=http://x model=m timeout=120 texts=1 seconds=3.21 +""" + assert parse_embed_debug_seconds(log_text) == [2.34, 3.21] + + +@pytest.mark.acp +def test_recommend_timeout_uses_margin(): + assert recommend_timeout([10.0, 12.0, 14.0], minimum=5, p95_multiplier=1.5, max_multiplier=1.25, pad_seconds=10) >= 28 + + +@pytest.mark.acp +def test_calibrate_timeout_recommendations(): + runs = [ + { + "flow": "phenotype_intent_split", + "wall_seconds": 10.0, + "diagnostics": {"llm_duration_seconds": 8.0, "llm_status": "ok"}, + }, + { + "flow": "phenotype_recommendation", + "wall_seconds": 18.0, + "fallback_reason": None, + "diagnostics": {"llm_duration_seconds": 12.0, "llm_status": "ok"}, + }, + { + "flow": "phenotype_recommendation_advice", + "wall_seconds": 9.0, + "fallback_reason": "llm_json_parse_failed", + "diagnostics": {"llm_duration_seconds": 7.0, "llm_status": "json_parse_failed"}, + }, + ] + calibration = calibrate_timeout_recommendations(runs, embed_seconds=[2.5, 3.5]) + env = calibration["recommended_env"] + assert env["ACP_TIMEOUT"] > env["LLM_TIMEOUT"] > 0 + assert env["STUDY_AGENT_MCP_TIMEOUT"] >= env["EMBED_TIMEOUT"] + assert calibration["llm_failures"] == [ + { + "flow": "phenotype_recommendation_advice", + "llm_status": "json_parse_failed", + "fallback_reason": "llm_json_parse_failed", + } + ] + env_text = render_env_fragment(calibration) + assert "LLM_TIMEOUT=" in env_text + assert "ACP_TIMEOUT=" in env_text