From 8790369709d0e178c5d075f08b11fbb47a8253ab Mon Sep 17 00:00:00 2001 From: Cal Reynolds <49540501+calreynolds@users.noreply.github.com> Date: Mon, 16 Mar 2026 16:41:12 -0400 Subject: [PATCH 1/6] Update install.sh Bug that @CheeYuTan found in PR #282! Thank you Steven! --- install.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/install.sh b/install.sh index 1927b4ac..32953478 100755 --- a/install.sh +++ b/install.sh @@ -1086,7 +1086,7 @@ install_skills() { # Determine target directories (array so paths with spaces work) for tool in $TOOLS; do case $tool in - claude) dirs=("$base_dir/.claude/skills") ;; + claude) dirs+=("$base_dir/.claude/skills") ;; cursor) echo "$TOOLS" | grep -q claude || dirs+=("$base_dir/.cursor/skills") ;; copilot) dirs+=("$base_dir/.github/skills") ;; codex) dirs+=("$base_dir/.agents/skills") ;; From 943d44643a4b7273b9a8454d701168011c302726 Mon Sep 17 00:00:00 2001 From: calreynolds Date: Mon, 16 Mar 2026 16:44:01 -0400 Subject: [PATCH 2/6] fix: rename unused loop variable to satisfy ruff B007 Co-authored-by: Isaac --- .test/src/skill_test/agent/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.test/src/skill_test/agent/executor.py b/.test/src/skill_test/agent/executor.py index 7726d6af..6e0e5223 100644 --- a/.test/src/skill_test/agent/executor.py +++ b/.test/src/skill_test/agent/executor.py @@ -510,7 +510,7 @@ async def run_agent( # Pass Databricks auth env vars to MCP server processes if mcp_config: mcp_env = {k: v for k, v in env.items() if k.startswith(("DATABRICKS_",))} - for server_name, server_cfg in mcp_config.items(): + for _server_name, server_cfg in mcp_config.items(): if "env" not in server_cfg and mcp_env: server_cfg["env"] = mcp_env From 2a50c85228917195f1ebd959ad4d3810432ac86a Mon Sep 17 00:00:00 2001 From: calreynolds Date: Mon, 16 Mar 2026 16:45:17 -0400 Subject: [PATCH 3/6] style: apply ruff format to test files Co-authored-by: Isaac --- .test/src/skill_test/agent/executor.py | 100 ++++++++--- .../skill_test/optimize/agent_evaluator.py | 136 ++++++++++----- .test/src/skill_test/optimize/judges.py | 65 ++++++-- .test/src/skill_test/optimize/runner.py | 156 ++++++++++++++---- 4 files changed, 343 insertions(+), 114 deletions(-) diff --git a/.test/src/skill_test/agent/executor.py b/.test/src/skill_test/agent/executor.py index 6e0e5223..c42f6fdf 100644 --- a/.test/src/skill_test/agent/executor.py +++ b/.test/src/skill_test/agent/executor.py @@ -104,7 +104,11 @@ def _build_trace_metrics( if tool_use_id in tool_calls_by_id: tc = tool_calls_by_id[tool_use_id] - tc.result = result_text[:2000] if isinstance(result_text, str) else str(result_text)[:2000] + tc.result = ( + result_text[:2000] + if isinstance(result_text, str) + else str(result_text)[:2000] + ) tc.success = not is_error # Extract file operations from tool results @@ -114,17 +118,23 @@ def _build_trace_metrics( fp = tool_input.get("file_path", "") if fp: metrics.files_created.append(fp) - metrics.file_operations.append(FileOperation(type="create", file_path=fp, timestamp=ts)) + metrics.file_operations.append( + FileOperation(type="create", file_path=fp, timestamp=ts) + ) elif tool_name == "Edit" and tc.success: fp = tool_input.get("file_path", "") if fp: metrics.files_modified.append(fp) - metrics.file_operations.append(FileOperation(type="edit", file_path=fp, timestamp=ts)) + metrics.file_operations.append( + FileOperation(type="edit", file_path=fp, timestamp=ts) + ) elif tool_name == "Read": fp = tool_input.get("file_path", "") if fp: metrics.files_read.append(fp) - metrics.file_operations.append(FileOperation(type="read", file_path=fp, timestamp=ts)) + metrics.file_operations.append( + FileOperation(type="read", file_path=fp, timestamp=ts) + ) elif event.type == "assistant_turn": num_turns += 1 @@ -197,7 +207,9 @@ def _load_mcp_config() -> dict[str, Any]: resolved_cfg[key] = val.replace("${CLAUDE_PLUGIN_ROOT}", str(repo_root)) elif isinstance(val, list): resolved_cfg[key] = [ - v.replace("${CLAUDE_PLUGIN_ROOT}", str(repo_root)) if isinstance(v, str) else v + v.replace("${CLAUDE_PLUGIN_ROOT}", str(repo_root)) + if isinstance(v, str) + else v for v in val ] else: @@ -283,7 +295,11 @@ def _get_agent_env() -> dict[str, str]: # 2. Env vars with known prefixes override settings file values # Skip internal Claude Code vars that would confuse the subprocess - _skip_keys = {"CLAUDE_CODE_SSE_PORT", "CLAUDE_CODE_ENTRYPOINT", "CLAUDE_CODE_DISABLE_FEEDBACK_SURVEY"} + _skip_keys = { + "CLAUDE_CODE_SSE_PORT", + "CLAUDE_CODE_ENTRYPOINT", + "CLAUDE_CODE_DISABLE_FEEDBACK_SURVEY", + } for key, value in os.environ.items(): if key in _skip_keys: continue @@ -301,7 +317,9 @@ def _get_agent_env() -> dict[str, str]: return env -def _get_mlflow_stop_hook(mlflow_experiment: str | None = None, skill_name: str | None = None): +def _get_mlflow_stop_hook( + mlflow_experiment: str | None = None, skill_name: str | None = None +): """Create an MLflow Stop hook that processes the transcript into a real trace. Mirrors the pattern from databricks-builder-app/server/services/agent.py: @@ -358,7 +376,9 @@ def _get_mlflow_stop_hook(mlflow_experiment: str | None = None, skill_name: str try: mlflow.set_experiment(experiment_name) except Exception as e: - logger.warning("MLflow set_experiment('%s') failed: %s", experiment_name, e) + logger.warning( + "MLflow set_experiment('%s') failed: %s", experiment_name, e + ) try: mlflow.create_experiment(experiment_name) mlflow.set_experiment(experiment_name) @@ -371,7 +391,9 @@ def _get_mlflow_stop_hook(mlflow_experiment: str | None = None, skill_name: str ) return None, None - print(f" [MLflow] Tracing configured: uri={tracking_uri} experiment={experiment_name}") + print( + f" [MLflow] Tracing configured: uri={tracking_uri} experiment={experiment_name}" + ) _mlflow_env_configured = True async def mlflow_stop_hook(input_data, tool_use_id, context): @@ -379,7 +401,9 @@ async def mlflow_stop_hook(input_data, tool_use_id, context): session_id = input_data.get("session_id") transcript_path = input_data.get("transcript_path") - print(f" [MLflow] Stop hook fired: session={session_id}, transcript={transcript_path}") + print( + f" [MLflow] Stop hook fired: session={session_id}, transcript={transcript_path}" + ) try: # Ensure MLflow is set up (matches builder app: call every time) @@ -391,7 +415,9 @@ async def mlflow_stop_hook(input_data, tool_use_id, context): loop = asyncio.get_running_loop() try: trace = await asyncio.wait_for( - loop.run_in_executor(None, process_transcript, transcript_path, session_id), + loop.run_in_executor( + None, process_transcript, transcript_path, session_id + ), timeout=120.0, ) except asyncio.TimeoutError: @@ -415,16 +441,24 @@ async def mlflow_stop_hook(input_data, tool_use_id, context): requested_model = os.environ.get("ANTHROPIC_MODEL", "") base_url = os.environ.get("ANTHROPIC_BASE_URL", "") if requested_model: - client.set_trace_tag(trace_id, "databricks.requested_model", requested_model) + client.set_trace_tag( + trace_id, "databricks.requested_model", requested_model + ) if base_url: - client.set_trace_tag(trace_id, "databricks.model_serving_endpoint", base_url) - client.set_trace_tag(trace_id, "mlflow.source", "skill-test-agent-eval") + client.set_trace_tag( + trace_id, "databricks.model_serving_endpoint", base_url + ) + client.set_trace_tag( + trace_id, "mlflow.source", "skill-test-agent-eval" + ) if skill_name: client.set_trace_tag(trace_id, "skill_name", skill_name) except Exception as tag_err: print(f" [MLflow] Warning: could not add tags: {tag_err}") else: - print(" [MLflow] Warning: process_transcript returned None (empty transcript?)") + print( + " [MLflow] Warning: process_transcript returned None (empty transcript?)" + ) except Exception as e: print(f" [MLflow] Error processing transcript: {e}") @@ -515,7 +549,9 @@ async def run_agent( server_cfg["env"] = mcp_env # Set up MLflow tracing via Stop hook - mlflow_hook, mlflow_result = _get_mlflow_stop_hook(mlflow_experiment=mlflow_experiment, skill_name=skill_name) + mlflow_hook, mlflow_result = _get_mlflow_stop_hook( + mlflow_experiment=mlflow_experiment, skill_name=skill_name + ) hooks = {} if mlflow_hook: hooks["Stop"] = [HookMatcher(hooks=[mlflow_hook])] @@ -568,8 +604,12 @@ def _stderr_callback(line: str): usage_data = { "input_tokens": getattr(msg.usage, "input_tokens", 0), "output_tokens": getattr(msg.usage, "output_tokens", 0), - "cache_creation_input_tokens": getattr(msg.usage, "cache_creation_input_tokens", 0), - "cache_read_input_tokens": getattr(msg.usage, "cache_read_input_tokens", 0), + "cache_creation_input_tokens": getattr( + msg.usage, "cache_creation_input_tokens", 0 + ), + "cache_read_input_tokens": getattr( + msg.usage, "cache_read_input_tokens", 0 + ), } events.append( AgentEvent( @@ -597,7 +637,9 @@ def _stderr_callback(line: str): data={ "id": block.id, "name": block.name, - "input": block.input if isinstance(block.input, dict) else {}, + "input": block.input + if isinstance(block.input, dict) + else {}, }, ) ) @@ -607,7 +649,9 @@ def _stderr_callback(line: str): type="tool_result", timestamp=now, data={ - "tool_use_id": getattr(block, "tool_use_id", ""), + "tool_use_id": getattr( + block, "tool_use_id", "" + ), "content": getattr(block, "content", ""), "is_error": getattr(block, "is_error", False), }, @@ -623,7 +667,9 @@ def _stderr_callback(line: str): type="tool_result", timestamp=now, data={ - "tool_use_id": getattr(block, "tool_use_id", ""), + "tool_use_id": getattr( + block, "tool_use_id", "" + ), "content": getattr(block, "content", ""), "is_error": getattr(block, "is_error", False), }, @@ -706,7 +752,9 @@ def _stderr_callback(line: str): _fluent_logger.setLevel(_logging.CRITICAL) _flush_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) try: - _flush_fut = _flush_pool.submit(mlflow.flush_trace_async_logging, terminate=False) + _flush_fut = _flush_pool.submit( + mlflow.flush_trace_async_logging, terminate=False + ) _flush_fut.result(timeout=30) _flush_pool.shutdown(wait=True) except concurrent.futures.TimeoutError: @@ -755,7 +803,9 @@ def _thread_target(): for task in pending: task.cancel() if pending: - loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + loop.run_until_complete( + asyncio.gather(*pending, return_exceptions=True) + ) loop.run_until_complete(loop.shutdown_asyncgens()) # Don't block on shutdown_default_executor() — it waits for # all tasks submitted via run_in_executor(None, ...), including @@ -763,7 +813,9 @@ def _thread_target(): # This avoids a deadlock where the default executor can't shut # down because process_transcript is still running. try: - loop.run_until_complete(asyncio.wait_for(loop.shutdown_default_executor(), timeout=5.0)) + loop.run_until_complete( + asyncio.wait_for(loop.shutdown_default_executor(), timeout=5.0) + ) except (asyncio.TimeoutError, Exception): pass # Let the default executor GC naturally except Exception: diff --git a/.test/src/skill_test/optimize/agent_evaluator.py b/.test/src/skill_test/optimize/agent_evaluator.py index 5cfa4feb..532dca7d 100644 --- a/.test/src/skill_test/optimize/agent_evaluator.py +++ b/.test/src/skill_test/optimize/agent_evaluator.py @@ -266,7 +266,9 @@ def _evaluate( # Decode expectations expectations: dict[str, Any] = {} - expectations_json = example.get("additional_context", {}).get("expectations", "") + expectations_json = example.get("additional_context", {}).get( + "expectations", "" + ) if expectations_json: try: expectations = json.loads(expectations_json) @@ -287,7 +289,9 @@ def _evaluate( # Phase 2: Run agent WITHOUT skill (cached) logger.info("Running agent WITHOUT skill (cached if available)...") - without_response, without_trace, without_mlflow_trace = self._get_baseline(prompt) + without_response, without_trace, without_mlflow_trace = self._get_baseline( + prompt + ) with_response = with_result.response_text with_trace = with_result.trace_metrics.to_dict() @@ -300,16 +304,18 @@ def _evaluate( facts_str = "\n".join(f"- {f}" for f in facts) if facts else "None specified" patterns_str = ( "\n".join( - f"- {p}" if isinstance(p, str) else f"- {p.get('description', p.get('pattern', ''))}" + f"- {p}" + if isinstance(p, str) + else f"- {p.get('description', p.get('pattern', ''))}" for p in patterns ) if patterns else "None specified" ) - guidelines_str = "\n".join(f"- {g}" for g in guidelines) if guidelines else "None specified" - expectations_text = ( - f"Expected facts:\n{facts_str}\n\nExpected patterns:\n{patterns_str}\n\nGuidelines:\n{guidelines_str}" + guidelines_str = ( + "\n".join(f"- {g}" for g in guidelines) if guidelines else "None specified" ) + expectations_text = f"Expected facts:\n{facts_str}\n\nExpected patterns:\n{patterns_str}\n\nGuidelines:\n{guidelines_str}" expectations_dict = {"criteria": expectations_text} baseline_key = _prompt_hash(prompt) @@ -321,8 +327,12 @@ def _evaluate( # Circuit breaker: after first failure, skip trace judges entirely # to avoid wasting API calls on a model that can't handle them. def _judge_with_fallback( - trace_judge, field_judge, *, - mlflow_trace, response_text, judge_name, + trace_judge, + field_judge, + *, + mlflow_trace, + response_text, + judge_name, ) -> JudgeFeedback: """Try trace-based judge, fall back to field-based on failure.""" with self._cache_lock: @@ -357,16 +367,20 @@ def _judge_with_fallback( # Correctness: WITH + WITHOUT (WITHOUT cached) correctness_with_fb = _judge_with_fallback( - self._trace_correctness_judge, self._field_correctness_judge, + self._trace_correctness_judge, + self._field_correctness_judge, mlflow_trace=with_result.mlflow_trace, response_text=with_response, judge_name="correctness_with", ) with self._cache_lock: - need_correctness_baseline = baseline_key not in self._baseline_correctness_cache + need_correctness_baseline = ( + baseline_key not in self._baseline_correctness_cache + ) if need_correctness_baseline: fb = _judge_with_fallback( - self._trace_correctness_judge, self._field_correctness_judge, + self._trace_correctness_judge, + self._field_correctness_judge, mlflow_trace=without_mlflow_trace, response_text=without_response, judge_name="correctness_without", @@ -379,16 +393,20 @@ def _judge_with_fallback( # Completeness: WITH + WITHOUT (WITHOUT cached) completeness_with_fb = _judge_with_fallback( - self._trace_completeness_judge, self._field_completeness_judge, + self._trace_completeness_judge, + self._field_completeness_judge, mlflow_trace=with_result.mlflow_trace, response_text=with_response, judge_name="completeness_with", ) with self._cache_lock: - need_completeness_baseline = baseline_key not in self._baseline_completeness_cache + need_completeness_baseline = ( + baseline_key not in self._baseline_completeness_cache + ) if need_completeness_baseline: fb = _judge_with_fallback( - self._trace_completeness_judge, self._field_completeness_judge, + self._trace_completeness_judge, + self._field_completeness_judge, mlflow_trace=without_mlflow_trace, response_text=without_response, judge_name="completeness_without", @@ -401,7 +419,8 @@ def _judge_with_fallback( # Guideline adherence: WITH only guideline_adherence_fb = _judge_with_fallback( - self._trace_guideline_judge, self._field_guideline_judge, + self._trace_guideline_judge, + self._field_guideline_judge, mlflow_trace=with_result.mlflow_trace, response_text=with_response, judge_name="guideline_adherence", @@ -444,7 +463,10 @@ def _judge_with_fallback( reg_val = regression_fb.value if isinstance(reg_val, bool): regression_penalty = 1.0 if reg_val else 0.0 - elif isinstance(reg_val, str) and reg_val.strip().lower() in ("yes", "true"): + elif isinstance(reg_val, str) and reg_val.strip().lower() in ( + "yes", + "true", + ): regression_penalty = 1.0 # Phase 4: Deterministic fact/pattern assertions (zero LLM cost — static spine) @@ -452,14 +474,28 @@ def _judge_with_fallback( without_assertion_results = run_all_assertions(without_response, expectations) fact_results = [r for r in with_assertion_results if r.assertion_type == "fact"] - pattern_results = [r for r in with_assertion_results if r.assertion_type == "pattern"] - fact_score = sum(1 for r in fact_results if r.passed) / len(fact_results) if fact_results else 1.0 - pattern_score = sum(1 for r in pattern_results if r.passed) / len(pattern_results) if pattern_results else 1.0 + pattern_results = [ + r for r in with_assertion_results if r.assertion_type == "pattern" + ] + fact_score = ( + sum(1 for r in fact_results if r.passed) / len(fact_results) + if fact_results + else 1.0 + ) + pattern_score = ( + sum(1 for r in pattern_results if r.passed) / len(pattern_results) + if pattern_results + else 1.0 + ) - failure_summary = summarize_failures(with_assertion_results, without_assertion_results) + failure_summary = summarize_failures( + with_assertion_results, without_assertion_results + ) # Phase 5: Deterministic trace scorers (static spine) - behavioral_score, behavioral_details = _run_behavioral_scorers(with_trace, trace_expectations) + behavioral_score, behavioral_details = _run_behavioral_scorers( + with_trace, trace_expectations + ) execution_success = _compute_execution_success(with_result) # Phase 6: Token efficiency @@ -478,19 +514,25 @@ def _judge_with_fallback( token_efficiency = 1.0 # Composite score: trace judges subsume tool_correctness + behavioral - quality_composite = (correctness_with + completeness_with + guideline_adherence_score) / 3.0 + quality_composite = ( + correctness_with + completeness_with + guideline_adherence_score + ) / 3.0 assertion_coverage = 0.5 * fact_score + 0.5 * pattern_score - final_score = max(0.0, min(1.0, - 0.25 * effectiveness_delta - + 0.20 * correctness_with - + 0.15 * completeness_with - + 0.15 * guideline_adherence_score - + 0.10 * assertion_coverage - + 0.05 * execution_success - + 0.05 * token_efficiency - - 0.05 * regression_penalty - )) + final_score = max( + 0.0, + min( + 1.0, + 0.25 * effectiveness_delta + + 0.20 * correctness_with + + 0.15 * completeness_with + + 0.15 * guideline_adherence_score + + 0.10 * assertion_coverage + + 0.05 * execution_success + + 0.05 * token_efficiency + - 0.05 * regression_penalty, + ), + ) # Build rich side_info for GEPA reflection side_info: dict[str, Any] = {} @@ -541,9 +583,13 @@ def _judge_with_fallback( # Assertion-based structured feedback side_info["Missing_Facts"] = [r.rationale for r in fact_results if not r.passed] - side_info["Missing_Patterns"] = [r.rationale for r in pattern_results if not r.passed] + side_info["Missing_Patterns"] = [ + r.rationale for r in pattern_results if not r.passed + ] side_info["Passed_Facts"] = [r.rationale for r in fact_results if r.passed] - side_info["Passed_Patterns"] = [r.rationale for r in pattern_results if r.passed] + side_info["Passed_Patterns"] = [ + r.rationale for r in pattern_results if r.passed + ] if failure_summary.get("Error") or failure_summary.get("Regressions"): side_info["skill_md_specific_info"] = { @@ -596,7 +642,9 @@ def _judge_with_fallback( side_info["token_counts"]["budget"] = self._token_budget # Diagnostic labels - weakest_dim = "correctness" if correctness_with <= completeness_with else "completeness" + weakest_dim = ( + "correctness" if correctness_with <= completeness_with else "completeness" + ) weakest_score = min(correctness_with, completeness_with) if failure_summary.get("Error"): @@ -607,7 +655,11 @@ def _judge_with_fallback( regressed_dims.append(f"correctness({correctness_delta:+.2f})") if completeness_delta < -0.05: regressed_dims.append(f"completeness({completeness_delta:+.2f})") - dims_str = ", ".join(regressed_dims) if regressed_dims else f"overall({effectiveness_delta:+.2f})" + dims_str = ( + ", ".join(regressed_dims) + if regressed_dims + else f"overall({effectiveness_delta:+.2f})" + ) side_info["Error"] = ( f"REGRESSION: {dims_str}. " f"correctness: {correctness_with:.2f} (was {correctness_without:.2f}), " @@ -649,7 +701,9 @@ def create_agent_evaluator( skill_guidelines = _collect_skill_guidelines(skill_name) if skill_guidelines: - logger.info("Loaded %d domain guidelines for agent trace judges", len(skill_guidelines)) + logger.info( + "Loaded %d domain guidelines for agent trace judges", len(skill_guidelines) + ) return AgentEvaluator( original_token_counts=original_token_counts, @@ -680,7 +734,9 @@ def build_agent_eval_background( baseline_desc = "" if baseline_scores: mean_score = sum(baseline_scores.values()) / len(baseline_scores) - baseline_desc = f"\nBASELINE: mean {mean_score:.3f} across {len(baseline_scores)} tasks." + baseline_desc = ( + f"\nBASELINE: mean {mean_score:.3f} across {len(baseline_scores)} tasks." + ) if baseline_side_info: needs_skill_ids = [] @@ -695,7 +751,9 @@ def build_agent_eval_background( behavioral = info.get("behavioral_scores", {}) for scorer_name, result in behavioral.items(): if result.get("value") == "no": - tool_issues.append(f"{tid}: {scorer_name} - {result.get('rationale', '')[:80]}") + tool_issues.append( + f"{tid}: {scorer_name} - {result.get('rationale', '')[:80]}" + ) if needs_skill_ids: baseline_desc += f"\n NEEDS_SKILL ({len(needs_skill_ids)} tasks): {', '.join(needs_skill_ids[:5])}" diff --git a/.test/src/skill_test/optimize/judges.py b/.test/src/skill_test/optimize/judges.py index 18940d28..66db622e 100644 --- a/.test/src/skill_test/optimize/judges.py +++ b/.test/src/skill_test/optimize/judges.py @@ -42,7 +42,9 @@ logger = logging.getLogger(__name__) -DEFAULT_JUDGE_LM = os.environ.get("GEPA_JUDGE_LM", "databricks:/databricks-claude-sonnet-4-6") +DEFAULT_JUDGE_LM = os.environ.get( + "GEPA_JUDGE_LM", "databricks:/databricks-claude-sonnet-4-6" +) # --------------------------------------------------------------------------- # Fallback model chain for rate limit errors @@ -87,6 +89,7 @@ def _is_rate_limit_error(exc: Exception) -> bool: # AI Gateway support # --------------------------------------------------------------------------- + def _get_gateway_base_url() -> str | None: """Return the AI Gateway base URL if configured, else None. @@ -123,7 +126,9 @@ def _to_litellm_model(model: str) -> tuple[str, str | None, str | None]: if gateway and model.startswith("databricks/"): # Route through AI Gateway as OpenAI-compatible endpoint endpoint_name = model.split("/", 1)[1] - api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get("DATABRICKS_API_KEY", "") + api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get( + "DATABRICKS_API_KEY", "" + ) return f"openai/{endpoint_name}", gateway, api_key or None return model, None, None @@ -151,7 +156,9 @@ def _judge_inference_params() -> dict[str, Any] | None: """Build inference_params for make_judge if AI Gateway is configured.""" gateway = _get_gateway_base_url() if gateway: - api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get("DATABRICKS_API_KEY", "") + api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get( + "DATABRICKS_API_KEY", "" + ) params: dict[str, Any] = {"base_url": gateway} if api_key: params["api_key"] = api_key @@ -173,7 +180,9 @@ def _to_judge_model_and_params(model: str) -> tuple[str, dict[str, Any] | None]: endpoint_name = model.split(":/", 1)[1] else: endpoint_name = model.split("/", 1)[1] - api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get("DATABRICKS_API_KEY", "") + api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get( + "DATABRICKS_API_KEY", "" + ) params: dict[str, Any] = {"base_url": gateway} if api_key: params["api_key"] = api_key @@ -360,12 +369,20 @@ def create_correctness_judge( instructions = _CORRECTNESS_INSTRUCTIONS if skill_guidelines: # Filter for correctness-related guidelines - filtered = [g for g in skill_guidelines if any(kw in g.lower() for kw in _CORRECTNESS_KEYWORDS)] + filtered = [ + g + for g in skill_guidelines + if any(kw in g.lower() for kw in _CORRECTNESS_KEYWORDS) + ] if filtered: principles = "\n".join(f"- {g}" for g in filtered) - instructions += f"\n\n## Domain-Specific Correctness Principles\n{principles}\n" + instructions += ( + f"\n\n## Domain-Specific Correctness Principles\n{principles}\n" + ) - model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) + model_uri, inference_params = _to_judge_model_and_params( + judge_model or DEFAULT_JUDGE_LM + ) return make_judge( name="skill_correctness", model=model_uri, @@ -425,7 +442,9 @@ def create_completeness_judge( Args: judge_model: LLM model for the judge. """ - model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) + model_uri, inference_params = _to_judge_model_and_params( + judge_model or DEFAULT_JUDGE_LM + ) return make_judge( name="skill_completeness", model=model_uri, @@ -495,7 +514,9 @@ def create_guideline_adherence_judge( principles = "\n".join(f"- {g}" for g in skill_guidelines) instructions += f"\n\n## Required Guidelines\n{principles}\n" - model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) + model_uri, inference_params = _to_judge_model_and_params( + judge_model or DEFAULT_JUDGE_LM + ) return make_judge( name="skill_guideline_adherence", model=model_uri, @@ -548,7 +569,9 @@ def create_regression_judge(judge_model: str | None = None) -> Any: judge_model: LLM model for the judge. Defaults to GEPA_JUDGE_LM env or databricks/databricks-claude-sonnet-4-6. """ - model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) + model_uri, inference_params = _to_judge_model_and_params( + judge_model or DEFAULT_JUDGE_LM + ) return make_judge( name="skill_regression", model=model_uri, @@ -607,7 +630,9 @@ def _call_judge(j): fb = future.result(timeout=timeout) except concurrent.futures.TimeoutError: logger.warning("Judge '%s' timed out after %ds", name, timeout) - return JudgeFeedback(value=0.0, rationale=f"Judge timed out after {timeout}s", name=name) + return JudgeFeedback( + value=0.0, rationale=f"Judge timed out after {timeout}s", name=name + ) finally: # shutdown(wait=False) so a still-running judge thread doesn't block pool.shutdown(wait=False) @@ -618,7 +643,9 @@ def _call_judge(j): ) except concurrent.futures.TimeoutError: # Already handled above, but keep for safety - return JudgeFeedback(value=0.0, rationale=f"Judge timed out after {timeout}s", name=name) + return JudgeFeedback( + value=0.0, rationale=f"Judge timed out after {timeout}s", name=name + ) except Exception as e: pool.shutdown(wait=False) if not _is_rate_limit_error(e): @@ -645,11 +672,17 @@ def _call_judge(j): fb = future.result(timeout=timeout) except concurrent.futures.TimeoutError: fb_pool.shutdown(wait=False) - logger.warning("Fallback '%s' timed out after %ds, trying next", fallback_model, timeout) + logger.warning( + "Fallback '%s' timed out after %ds, trying next", + fallback_model, + timeout, + ) continue finally: fb_pool.shutdown(wait=False) - logger.info("Judge '%s' succeeded with fallback model '%s'", name, fallback_model) + logger.info( + "Judge '%s' succeeded with fallback model '%s'", name, fallback_model + ) return JudgeFeedback( value=fb.value, rationale=fb.rationale or "", @@ -657,7 +690,9 @@ def _call_judge(j): ) except Exception as fallback_err: if _is_rate_limit_error(fallback_err): - logger.warning("Fallback '%s' also rate limited, trying next", fallback_model) + logger.warning( + "Fallback '%s' also rate limited, trying next", fallback_model + ) continue logger.warning("Fallback '%s' failed: %s", fallback_model, fallback_err) continue diff --git a/.test/src/skill_test/optimize/runner.py b/.test/src/skill_test/optimize/runner.py index 223e63cb..75729e78 100644 --- a/.test/src/skill_test/optimize/runner.py +++ b/.test/src/skill_test/optimize/runner.py @@ -90,8 +90,12 @@ def _compute_diff_summary(original: str, optimized: str) -> str: if not diff: return "No changes" - added = sum(1 for line in diff if line.startswith("+") and not line.startswith("+++")) - removed = sum(1 for line in diff if line.startswith("-") and not line.startswith("---")) + added = sum( + 1 for line in diff if line.startswith("+") and not line.startswith("+++") + ) + removed = sum( + 1 for line in diff if line.startswith("-") and not line.startswith("---") + ) parts = [] if added: @@ -101,7 +105,11 @@ def _compute_diff_summary(original: str, optimized: str) -> str: changed_sections = set() for line in diff: - content = line[1:].strip() if line.startswith(("+", "-")) and not line.startswith(("+++", "---")) else "" + content = ( + line[1:].strip() + if line.startswith(("+", "-")) and not line.startswith(("+++", "---")) + else "" + ) if content.startswith("#"): changed_sections.add(content) @@ -113,7 +121,9 @@ def _compute_diff_summary(original: str, optimized: str) -> str: return summary -def _evaluate_on_tasks(evaluator, candidate, tasks, label: str = "Evaluating", max_parallel: int = 1): +def _evaluate_on_tasks( + evaluator, candidate, tasks, label: str = "Evaluating", max_parallel: int = 1 +): """Run evaluator on tasks and return mean score, per-task scores, and per-task side_info. Args: @@ -159,18 +169,27 @@ def _eval_task(idx, inst, task_id): try: for future in concurrent.futures.as_completed(futures, timeout=900): try: - idx, task_id, inst, score, side_info = future.result(timeout=900) + idx, task_id, inst, score, side_info = future.result( + timeout=900 + ) except Exception as e: idx = futures[future] task_id = tasks[idx].get("id", f"task_{idx}") inst = gepa_instances[idx] - score, side_info = 0.0, {"_error": str(e), "scores": {"final": 0.0}} + score, side_info = ( + 0.0, + {"_error": str(e), "scores": {"final": 0.0}}, + ) logger.warning("Evaluator failed for task %s: %s", task_id, e) per_task[task_id] = score side_info_by_id[task_id] = side_info side_info_by_input[inst.get("input", f"task_{idx}")] = side_info completed += 1 - print(f"\r {label}: {completed}/{total} ({task_id})...", end="", flush=True) + print( + f"\r {label}: {completed}/{total} ({task_id})...", + end="", + flush=True, + ) except TimeoutError: # as_completed timeout — score remaining tasks as 0.0 for future, idx in futures.items(): @@ -179,12 +198,22 @@ def _eval_task(idx, inst, task_id): inst = gepa_instances[idx] per_task.setdefault(task_id, 0.0) side_info_by_id.setdefault( - task_id, {"_error": "as_completed timeout (900s)", "scores": {"final": 0.0}} + task_id, + { + "_error": "as_completed timeout (900s)", + "scores": {"final": 0.0}, + }, + ) + side_info_by_input.setdefault( + inst.get("input", f"task_{idx}"), side_info_by_id[task_id] ) - side_info_by_input.setdefault(inst.get("input", f"task_{idx}"), side_info_by_id[task_id]) future.cancel() - logger.warning("Task %s timed out in as_completed (900s)", task_id) - print(f"\n WARNING: {label} timed out after 900s — scoring remaining tasks as 0.0") + logger.warning( + "Task %s timed out in as_completed (900s)", task_id + ) + print( + f"\n WARNING: {label} timed out after 900s — scoring remaining tasks as 0.0" + ) pool.shutdown(wait=True) except Exception: pool.shutdown(wait=False) @@ -383,7 +412,9 @@ def optimize_skill( # Auto-derive AI Gateway URL from ANTHROPIC_BASE_URL if not explicitly set if not os.environ.get("DATABRICKS_AI_GATEWAY_URL"): - _anthropic_base = _agent_env.get("ANTHROPIC_BASE_URL", "") or os.environ.get("ANTHROPIC_BASE_URL", "") + _anthropic_base = _agent_env.get("ANTHROPIC_BASE_URL", "") or os.environ.get( + "ANTHROPIC_BASE_URL", "" + ) if "ai-gateway.cloud.databricks.com" in _anthropic_base: from urllib.parse import urlparse @@ -418,7 +449,9 @@ def optimize_skill( # Build read-only tool context string (for skill optimization) if tool_components: - tool_context_str = "\n\n".join(tool_components[k] for k in sorted(tool_components)) + tool_context_str = "\n\n".join( + tool_components[k] for k in sorted(tool_components) + ) # 2. Build seed_candidate (multi-component dict) seed_candidate: dict[str, str] = {} @@ -451,11 +484,17 @@ def optimize_skill( # 3. Load datasets if tools_only: # Cross-skill dataset for tool optimization - train = create_cross_skill_dataset(max_per_skill=max_per_skill or 5, tool_modules=tool_modules) + train = create_cross_skill_dataset( + max_per_skill=max_per_skill or 5, tool_modules=tool_modules + ) val = None if train: - source_skills = {t.get("metadata", {}).get("source_skill", "?") for t in train} - print(f"Cross-skill dataset: {len(train)} tasks from {len(source_skills)} skill(s)") + source_skills = { + t.get("metadata", {}).get("source_skill", "?") for t in train + } + print( + f"Cross-skill dataset: {len(train)} tasks from {len(source_skills)} skill(s)" + ) else: # Fall back to single-skill dataset try: @@ -487,7 +526,9 @@ def optimize_skill( if records: assessment_summary = summarize_assessment_patterns(records) assessment_by_task = match_assessments_to_tasks(records, train) - print(f"MLflow assessments: {len(records)} traces, {len(assessment_by_task)} tasks matched") + print( + f"MLflow assessments: {len(records)} traces, {len(assessment_by_task)} tasks matched" + ) if assessment_summary: print(f" {assessment_summary.splitlines()[0]}") else: @@ -505,7 +546,9 @@ def optimize_skill( print("Evaluator: skillbench (judge-driven)") if not effective_gen_model: - raise ValueError("SkillBench evaluator requires a gen_model. Pass --gen-model or set GEPA_GEN_LM env var.") + raise ValueError( + "SkillBench evaluator requires a gen_model. Pass --gen-model or set GEPA_GEN_LM env var." + ) evaluator = create_skillbench_evaluator( skill_name, gen_model=effective_gen_model, @@ -535,6 +578,7 @@ def optimize_skill( if _manifest_path.exists(): try: import yaml as _yaml + _manifest_data = _yaml.safe_load(_manifest_path.read_text()) or {} _manifest_tool_modules = _manifest_data.get("tool_modules") except Exception: @@ -558,7 +602,9 @@ def optimize_skill( evaluator = agent_evaluator print("Mode: agent-eval-full (agent for ALL iterations)") else: - print("Mode: agent-eval hybrid (proxy for GEPA, agent for baseline + validation)") + print( + "Mode: agent-eval hybrid (proxy for GEPA, agent for baseline + validation)" + ) # Determine parallelism for evaluator calls (agent evaluator only) _eval_max_parallel = parallel_agents if agent_eval_full else 1 @@ -628,7 +674,9 @@ def _refiner_lm_with_fallback(prompt): for comp, tokens in original_token_counts.items(): print(f" {comp}: {tokens:,} tokens") if tool_context_str: - print(f"Tool context (read-only): {count_tokens(tool_context_str):,} tokens") + print( + f"Tool context (read-only): {count_tokens(tool_context_str):,} tokens" + ) print(f"Train tasks: {len(train)}") print(f"Val tasks: {len(val) if val else 'None (single-task mode)'}") print(f"Generation model: {effective_gen_model}") @@ -643,7 +691,10 @@ def _refiner_lm_with_fallback(prompt): print(f"\nScoring baseline ({len(train)} tasks, ~5 LLM calls each)...") original_score, original_per_task, si_by_id, _ = _evaluate_on_tasks( - evaluator, seed_candidate, train, label="Baseline", + evaluator, + seed_candidate, + train, + label="Baseline", max_parallel=_eval_max_parallel, ) print(f"Current score: {original_score:.3f}") @@ -667,9 +718,14 @@ def _refiner_lm_with_fallback(prompt): dry_run_agent_si = None if agent_evaluator: print(f"\nAgent baseline ({len(train)} tasks)...") - dry_run_agent_score, agent_per_task, dry_run_agent_si, _ = _evaluate_on_tasks( - agent_evaluator, seed_candidate, train, label="Agent baseline", - max_parallel=parallel_agents, + dry_run_agent_score, agent_per_task, dry_run_agent_si, _ = ( + _evaluate_on_tasks( + agent_evaluator, + seed_candidate, + train, + label="Agent baseline", + max_parallel=parallel_agents, + ) ) print(f"Agent baseline score: {dry_run_agent_score:.3f}") for task_id, score in agent_per_task.items(): @@ -704,13 +760,17 @@ def _refiner_lm_with_fallback(prompt): _eval_desc = "2 agent runs + judges" if agent_eval_full else "~5 LLM calls" print(f"\nScoring {_eval_label.lower()} ({len(train)} tasks, {_eval_desc} each)...") original_score, original_per_task, si_by_id, si_by_input = _evaluate_on_tasks( - evaluator, seed_candidate, train, label=_eval_label, + evaluator, + seed_candidate, + train, + label=_eval_label, max_parallel=_eval_max_parallel, ) # 6. Build background and objective if agent_eval_full: from .agent_evaluator import build_agent_eval_background + background = build_agent_eval_background( skill_name, total_original_tokens, @@ -763,9 +823,14 @@ def _refiner_lm_with_fallback(prompt): # 6b. Agent baseline scoring (hybrid mode: before GEPA loop) if agent_evaluator and not agent_eval_full: print(f"\n Agent baseline scoring ({len(train)} tasks)...") - agent_baseline_score, agent_baseline_per_task, agent_baseline_si, _ = _evaluate_on_tasks( - agent_evaluator, seed_candidate, train, label="Agent baseline", - max_parallel=parallel_agents, + agent_baseline_score, agent_baseline_per_task, agent_baseline_si, _ = ( + _evaluate_on_tasks( + agent_evaluator, + seed_candidate, + train, + label="Agent baseline", + max_parallel=parallel_agents, + ) ) print(f" Agent baseline score: {agent_baseline_score:.3f}") for task_id, score in agent_baseline_per_task.items(): @@ -790,7 +855,11 @@ def _refiner_lm_with_fallback(prompt): ) # estimate_pass_duration expects the model name string, not the callable - _est_reflection_lm = _reflection_model_name if _reflection_model_name else str(reflection_lm or DEFAULT_GEN_LM) + _est_reflection_lm = ( + _reflection_model_name + if _reflection_model_name + else str(reflection_lm or DEFAULT_GEN_LM) + ) est_secs = estimate_pass_duration( config.engine.max_metric_calls, _est_reflection_lm, @@ -805,7 +874,9 @@ def _refiner_lm_with_fallback(prompt): ) for pass_num in range(1, max_passes + 1): - print(f"\n --- Pass {pass_num}/{max_passes} (best score so far: {best_score:.4f}) ---") + print( + f"\n --- Pass {pass_num}/{max_passes} (best score so far: {best_score:.4f}) ---" + ) pass_config = copy.deepcopy(config) @@ -826,12 +897,17 @@ def _refiner_lm_with_fallback(prompt): candidate = result.best_candidate pass_score, _, pass_si_by_id, _ = _evaluate_on_tasks( - evaluator, candidate, train, label=f"Pass {pass_num}", + evaluator, + candidate, + train, + label=f"Pass {pass_num}", max_parallel=_eval_max_parallel, ) improvement = pass_score - best_score - print(f" Pass {pass_num} score: {pass_score:.4f} (delta: {'+' if improvement >= 0 else ''}{improvement:.4f})") + print( + f" Pass {pass_num} score: {pass_score:.4f} (delta: {'+' if improvement >= 0 else ''}{improvement:.4f})" + ) if pass_score > best_score + improvement_threshold: best = dict(candidate) @@ -859,7 +935,10 @@ def _refiner_lm_with_fallback(prompt): val_scores: dict[str, float] = {} if val: _, val_scores, _, _ = _evaluate_on_tasks( - evaluator, best, val, label="Validation", + evaluator, + best, + val, + label="Validation", max_parallel=_eval_max_parallel, ) @@ -877,9 +956,14 @@ def _refiner_lm_with_fallback(prompt): if agent_evaluator and not agent_eval_full: print(f"\n Agent validation scoring ({len(train)} tasks on best candidate)...") - agent_validation_score, agent_val_per_task, agent_validation_si, _ = _evaluate_on_tasks( - agent_evaluator, best, train, label="Agent validation", - max_parallel=parallel_agents, + agent_validation_score, agent_val_per_task, agent_validation_si, _ = ( + _evaluate_on_tasks( + agent_evaluator, + best, + train, + label="Agent validation", + max_parallel=parallel_agents, + ) ) print(f" Agent validation score: {agent_validation_score:.3f}") for task_id, score in agent_val_per_task.items(): From b62062c8933729aafe2b398b6f701292e42e1816 Mon Sep 17 00:00:00 2001 From: calreynolds Date: Mon, 16 Mar 2026 16:46:47 -0400 Subject: [PATCH 4/6] style: fix line length violation in agent_evaluator.py Co-authored-by: Isaac --- .test/src/skill_test/optimize/agent_evaluator.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.test/src/skill_test/optimize/agent_evaluator.py b/.test/src/skill_test/optimize/agent_evaluator.py index 532dca7d..50b989d0 100644 --- a/.test/src/skill_test/optimize/agent_evaluator.py +++ b/.test/src/skill_test/optimize/agent_evaluator.py @@ -315,7 +315,10 @@ def _evaluate( guidelines_str = ( "\n".join(f"- {g}" for g in guidelines) if guidelines else "None specified" ) - expectations_text = f"Expected facts:\n{facts_str}\n\nExpected patterns:\n{patterns_str}\n\nGuidelines:\n{guidelines_str}" + expectations_text = ( + f"Expected facts:\n{facts_str}\n\nExpected patterns:\n{patterns_str}" + f"\n\nGuidelines:\n{guidelines_str}" + ) expectations_dict = {"criteria": expectations_text} baseline_key = _prompt_hash(prompt) From ac47090077205a7bf321a75844e9f60e1e35bedd Mon Sep 17 00:00:00 2001 From: calreynolds Date: Mon, 16 Mar 2026 16:48:16 -0400 Subject: [PATCH 5/6] style: apply ruff format (line-length=120) to test files Co-authored-by: Isaac --- .test/src/skill_test/agent/executor.py | 95 ++++--------- .../skill_test/optimize/agent_evaluator.py | 98 ++++---------- .../src/skill_test/optimize/eval_criteria.py | 13 +- .test/src/skill_test/optimize/judges.py | 94 +++---------- .test/src/skill_test/optimize/runner.py | 126 ++++++------------ 5 files changed, 108 insertions(+), 318 deletions(-) diff --git a/.test/src/skill_test/agent/executor.py b/.test/src/skill_test/agent/executor.py index c42f6fdf..283174f4 100644 --- a/.test/src/skill_test/agent/executor.py +++ b/.test/src/skill_test/agent/executor.py @@ -104,11 +104,7 @@ def _build_trace_metrics( if tool_use_id in tool_calls_by_id: tc = tool_calls_by_id[tool_use_id] - tc.result = ( - result_text[:2000] - if isinstance(result_text, str) - else str(result_text)[:2000] - ) + tc.result = result_text[:2000] if isinstance(result_text, str) else str(result_text)[:2000] tc.success = not is_error # Extract file operations from tool results @@ -118,23 +114,17 @@ def _build_trace_metrics( fp = tool_input.get("file_path", "") if fp: metrics.files_created.append(fp) - metrics.file_operations.append( - FileOperation(type="create", file_path=fp, timestamp=ts) - ) + metrics.file_operations.append(FileOperation(type="create", file_path=fp, timestamp=ts)) elif tool_name == "Edit" and tc.success: fp = tool_input.get("file_path", "") if fp: metrics.files_modified.append(fp) - metrics.file_operations.append( - FileOperation(type="edit", file_path=fp, timestamp=ts) - ) + metrics.file_operations.append(FileOperation(type="edit", file_path=fp, timestamp=ts)) elif tool_name == "Read": fp = tool_input.get("file_path", "") if fp: metrics.files_read.append(fp) - metrics.file_operations.append( - FileOperation(type="read", file_path=fp, timestamp=ts) - ) + metrics.file_operations.append(FileOperation(type="read", file_path=fp, timestamp=ts)) elif event.type == "assistant_turn": num_turns += 1 @@ -207,10 +197,7 @@ def _load_mcp_config() -> dict[str, Any]: resolved_cfg[key] = val.replace("${CLAUDE_PLUGIN_ROOT}", str(repo_root)) elif isinstance(val, list): resolved_cfg[key] = [ - v.replace("${CLAUDE_PLUGIN_ROOT}", str(repo_root)) - if isinstance(v, str) - else v - for v in val + v.replace("${CLAUDE_PLUGIN_ROOT}", str(repo_root)) if isinstance(v, str) else v for v in val ] else: resolved_cfg[key] = val @@ -317,9 +304,7 @@ def _get_agent_env() -> dict[str, str]: return env -def _get_mlflow_stop_hook( - mlflow_experiment: str | None = None, skill_name: str | None = None -): +def _get_mlflow_stop_hook(mlflow_experiment: str | None = None, skill_name: str | None = None): """Create an MLflow Stop hook that processes the transcript into a real trace. Mirrors the pattern from databricks-builder-app/server/services/agent.py: @@ -376,9 +361,7 @@ def _get_mlflow_stop_hook( try: mlflow.set_experiment(experiment_name) except Exception as e: - logger.warning( - "MLflow set_experiment('%s') failed: %s", experiment_name, e - ) + logger.warning("MLflow set_experiment('%s') failed: %s", experiment_name, e) try: mlflow.create_experiment(experiment_name) mlflow.set_experiment(experiment_name) @@ -391,9 +374,7 @@ def _get_mlflow_stop_hook( ) return None, None - print( - f" [MLflow] Tracing configured: uri={tracking_uri} experiment={experiment_name}" - ) + print(f" [MLflow] Tracing configured: uri={tracking_uri} experiment={experiment_name}") _mlflow_env_configured = True async def mlflow_stop_hook(input_data, tool_use_id, context): @@ -401,9 +382,7 @@ async def mlflow_stop_hook(input_data, tool_use_id, context): session_id = input_data.get("session_id") transcript_path = input_data.get("transcript_path") - print( - f" [MLflow] Stop hook fired: session={session_id}, transcript={transcript_path}" - ) + print(f" [MLflow] Stop hook fired: session={session_id}, transcript={transcript_path}") try: # Ensure MLflow is set up (matches builder app: call every time) @@ -415,9 +394,7 @@ async def mlflow_stop_hook(input_data, tool_use_id, context): loop = asyncio.get_running_loop() try: trace = await asyncio.wait_for( - loop.run_in_executor( - None, process_transcript, transcript_path, session_id - ), + loop.run_in_executor(None, process_transcript, transcript_path, session_id), timeout=120.0, ) except asyncio.TimeoutError: @@ -441,24 +418,16 @@ async def mlflow_stop_hook(input_data, tool_use_id, context): requested_model = os.environ.get("ANTHROPIC_MODEL", "") base_url = os.environ.get("ANTHROPIC_BASE_URL", "") if requested_model: - client.set_trace_tag( - trace_id, "databricks.requested_model", requested_model - ) + client.set_trace_tag(trace_id, "databricks.requested_model", requested_model) if base_url: - client.set_trace_tag( - trace_id, "databricks.model_serving_endpoint", base_url - ) - client.set_trace_tag( - trace_id, "mlflow.source", "skill-test-agent-eval" - ) + client.set_trace_tag(trace_id, "databricks.model_serving_endpoint", base_url) + client.set_trace_tag(trace_id, "mlflow.source", "skill-test-agent-eval") if skill_name: client.set_trace_tag(trace_id, "skill_name", skill_name) except Exception as tag_err: print(f" [MLflow] Warning: could not add tags: {tag_err}") else: - print( - " [MLflow] Warning: process_transcript returned None (empty transcript?)" - ) + print(" [MLflow] Warning: process_transcript returned None (empty transcript?)") except Exception as e: print(f" [MLflow] Error processing transcript: {e}") @@ -549,9 +518,7 @@ async def run_agent( server_cfg["env"] = mcp_env # Set up MLflow tracing via Stop hook - mlflow_hook, mlflow_result = _get_mlflow_stop_hook( - mlflow_experiment=mlflow_experiment, skill_name=skill_name - ) + mlflow_hook, mlflow_result = _get_mlflow_stop_hook(mlflow_experiment=mlflow_experiment, skill_name=skill_name) hooks = {} if mlflow_hook: hooks["Stop"] = [HookMatcher(hooks=[mlflow_hook])] @@ -604,12 +571,8 @@ def _stderr_callback(line: str): usage_data = { "input_tokens": getattr(msg.usage, "input_tokens", 0), "output_tokens": getattr(msg.usage, "output_tokens", 0), - "cache_creation_input_tokens": getattr( - msg.usage, "cache_creation_input_tokens", 0 - ), - "cache_read_input_tokens": getattr( - msg.usage, "cache_read_input_tokens", 0 - ), + "cache_creation_input_tokens": getattr(msg.usage, "cache_creation_input_tokens", 0), + "cache_read_input_tokens": getattr(msg.usage, "cache_read_input_tokens", 0), } events.append( AgentEvent( @@ -637,9 +600,7 @@ def _stderr_callback(line: str): data={ "id": block.id, "name": block.name, - "input": block.input - if isinstance(block.input, dict) - else {}, + "input": block.input if isinstance(block.input, dict) else {}, }, ) ) @@ -649,9 +610,7 @@ def _stderr_callback(line: str): type="tool_result", timestamp=now, data={ - "tool_use_id": getattr( - block, "tool_use_id", "" - ), + "tool_use_id": getattr(block, "tool_use_id", ""), "content": getattr(block, "content", ""), "is_error": getattr(block, "is_error", False), }, @@ -667,9 +626,7 @@ def _stderr_callback(line: str): type="tool_result", timestamp=now, data={ - "tool_use_id": getattr( - block, "tool_use_id", "" - ), + "tool_use_id": getattr(block, "tool_use_id", ""), "content": getattr(block, "content", ""), "is_error": getattr(block, "is_error", False), }, @@ -752,9 +709,7 @@ def _stderr_callback(line: str): _fluent_logger.setLevel(_logging.CRITICAL) _flush_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) try: - _flush_fut = _flush_pool.submit( - mlflow.flush_trace_async_logging, terminate=False - ) + _flush_fut = _flush_pool.submit(mlflow.flush_trace_async_logging, terminate=False) _flush_fut.result(timeout=30) _flush_pool.shutdown(wait=True) except concurrent.futures.TimeoutError: @@ -803,9 +758,7 @@ def _thread_target(): for task in pending: task.cancel() if pending: - loop.run_until_complete( - asyncio.gather(*pending, return_exceptions=True) - ) + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) loop.run_until_complete(loop.shutdown_asyncgens()) # Don't block on shutdown_default_executor() — it waits for # all tasks submitted via run_in_executor(None, ...), including @@ -813,9 +766,7 @@ def _thread_target(): # This avoids a deadlock where the default executor can't shut # down because process_transcript is still running. try: - loop.run_until_complete( - asyncio.wait_for(loop.shutdown_default_executor(), timeout=5.0) - ) + loop.run_until_complete(asyncio.wait_for(loop.shutdown_default_executor(), timeout=5.0)) except (asyncio.TimeoutError, Exception): pass # Let the default executor GC naturally except Exception: diff --git a/.test/src/skill_test/optimize/agent_evaluator.py b/.test/src/skill_test/optimize/agent_evaluator.py index 50b989d0..2651a4ab 100644 --- a/.test/src/skill_test/optimize/agent_evaluator.py +++ b/.test/src/skill_test/optimize/agent_evaluator.py @@ -189,15 +189,9 @@ def __init__( ) # --- Field-based judges (fallback — when mlflow_trace is None) --- - self._field_correctness_judge = create_correctness_judge( - skill_guidelines, judge_model=judge_model - ) - self._field_completeness_judge = create_completeness_judge( - judge_model=judge_model - ) - self._field_guideline_judge = create_guideline_adherence_judge( - skill_guidelines, judge_model=judge_model - ) + self._field_correctness_judge = create_correctness_judge(skill_guidelines, judge_model=judge_model) + self._field_completeness_judge = create_completeness_judge(judge_model=judge_model) + self._field_guideline_judge = create_guideline_adherence_judge(skill_guidelines, judge_model=judge_model) self._regression_judge = create_regression_judge(judge_model=judge_model) @@ -266,9 +260,7 @@ def _evaluate( # Decode expectations expectations: dict[str, Any] = {} - expectations_json = example.get("additional_context", {}).get( - "expectations", "" - ) + expectations_json = example.get("additional_context", {}).get("expectations", "") if expectations_json: try: expectations = json.loads(expectations_json) @@ -289,9 +281,7 @@ def _evaluate( # Phase 2: Run agent WITHOUT skill (cached) logger.info("Running agent WITHOUT skill (cached if available)...") - without_response, without_trace, without_mlflow_trace = self._get_baseline( - prompt - ) + without_response, without_trace, without_mlflow_trace = self._get_baseline(prompt) with_response = with_result.response_text with_trace = with_result.trace_metrics.to_dict() @@ -304,20 +294,14 @@ def _evaluate( facts_str = "\n".join(f"- {f}" for f in facts) if facts else "None specified" patterns_str = ( "\n".join( - f"- {p}" - if isinstance(p, str) - else f"- {p.get('description', p.get('pattern', ''))}" - for p in patterns + f"- {p}" if isinstance(p, str) else f"- {p.get('description', p.get('pattern', ''))}" for p in patterns ) if patterns else "None specified" ) - guidelines_str = ( - "\n".join(f"- {g}" for g in guidelines) if guidelines else "None specified" - ) + guidelines_str = "\n".join(f"- {g}" for g in guidelines) if guidelines else "None specified" expectations_text = ( - f"Expected facts:\n{facts_str}\n\nExpected patterns:\n{patterns_str}" - f"\n\nGuidelines:\n{guidelines_str}" + f"Expected facts:\n{facts_str}\n\nExpected patterns:\n{patterns_str}\n\nGuidelines:\n{guidelines_str}" ) expectations_dict = {"criteria": expectations_text} @@ -377,9 +361,7 @@ def _judge_with_fallback( judge_name="correctness_with", ) with self._cache_lock: - need_correctness_baseline = ( - baseline_key not in self._baseline_correctness_cache - ) + need_correctness_baseline = baseline_key not in self._baseline_correctness_cache if need_correctness_baseline: fb = _judge_with_fallback( self._trace_correctness_judge, @@ -403,9 +385,7 @@ def _judge_with_fallback( judge_name="completeness_with", ) with self._cache_lock: - need_completeness_baseline = ( - baseline_key not in self._baseline_completeness_cache - ) + need_completeness_baseline = baseline_key not in self._baseline_completeness_cache if need_completeness_baseline: fb = _judge_with_fallback( self._trace_completeness_judge, @@ -477,28 +457,14 @@ def _judge_with_fallback( without_assertion_results = run_all_assertions(without_response, expectations) fact_results = [r for r in with_assertion_results if r.assertion_type == "fact"] - pattern_results = [ - r for r in with_assertion_results if r.assertion_type == "pattern" - ] - fact_score = ( - sum(1 for r in fact_results if r.passed) / len(fact_results) - if fact_results - else 1.0 - ) - pattern_score = ( - sum(1 for r in pattern_results if r.passed) / len(pattern_results) - if pattern_results - else 1.0 - ) + pattern_results = [r for r in with_assertion_results if r.assertion_type == "pattern"] + fact_score = sum(1 for r in fact_results if r.passed) / len(fact_results) if fact_results else 1.0 + pattern_score = sum(1 for r in pattern_results if r.passed) / len(pattern_results) if pattern_results else 1.0 - failure_summary = summarize_failures( - with_assertion_results, without_assertion_results - ) + failure_summary = summarize_failures(with_assertion_results, without_assertion_results) # Phase 5: Deterministic trace scorers (static spine) - behavioral_score, behavioral_details = _run_behavioral_scorers( - with_trace, trace_expectations - ) + behavioral_score, behavioral_details = _run_behavioral_scorers(with_trace, trace_expectations) execution_success = _compute_execution_success(with_result) # Phase 6: Token efficiency @@ -517,9 +483,7 @@ def _judge_with_fallback( token_efficiency = 1.0 # Composite score: trace judges subsume tool_correctness + behavioral - quality_composite = ( - correctness_with + completeness_with + guideline_adherence_score - ) / 3.0 + quality_composite = (correctness_with + completeness_with + guideline_adherence_score) / 3.0 assertion_coverage = 0.5 * fact_score + 0.5 * pattern_score final_score = max( @@ -586,13 +550,9 @@ def _judge_with_fallback( # Assertion-based structured feedback side_info["Missing_Facts"] = [r.rationale for r in fact_results if not r.passed] - side_info["Missing_Patterns"] = [ - r.rationale for r in pattern_results if not r.passed - ] + side_info["Missing_Patterns"] = [r.rationale for r in pattern_results if not r.passed] side_info["Passed_Facts"] = [r.rationale for r in fact_results if r.passed] - side_info["Passed_Patterns"] = [ - r.rationale for r in pattern_results if r.passed - ] + side_info["Passed_Patterns"] = [r.rationale for r in pattern_results if r.passed] if failure_summary.get("Error") or failure_summary.get("Regressions"): side_info["skill_md_specific_info"] = { @@ -645,9 +605,7 @@ def _judge_with_fallback( side_info["token_counts"]["budget"] = self._token_budget # Diagnostic labels - weakest_dim = ( - "correctness" if correctness_with <= completeness_with else "completeness" - ) + weakest_dim = "correctness" if correctness_with <= completeness_with else "completeness" weakest_score = min(correctness_with, completeness_with) if failure_summary.get("Error"): @@ -658,11 +616,7 @@ def _judge_with_fallback( regressed_dims.append(f"correctness({correctness_delta:+.2f})") if completeness_delta < -0.05: regressed_dims.append(f"completeness({completeness_delta:+.2f})") - dims_str = ( - ", ".join(regressed_dims) - if regressed_dims - else f"overall({effectiveness_delta:+.2f})" - ) + dims_str = ", ".join(regressed_dims) if regressed_dims else f"overall({effectiveness_delta:+.2f})" side_info["Error"] = ( f"REGRESSION: {dims_str}. " f"correctness: {correctness_with:.2f} (was {correctness_without:.2f}), " @@ -704,9 +658,7 @@ def create_agent_evaluator( skill_guidelines = _collect_skill_guidelines(skill_name) if skill_guidelines: - logger.info( - "Loaded %d domain guidelines for agent trace judges", len(skill_guidelines) - ) + logger.info("Loaded %d domain guidelines for agent trace judges", len(skill_guidelines)) return AgentEvaluator( original_token_counts=original_token_counts, @@ -737,9 +689,7 @@ def build_agent_eval_background( baseline_desc = "" if baseline_scores: mean_score = sum(baseline_scores.values()) / len(baseline_scores) - baseline_desc = ( - f"\nBASELINE: mean {mean_score:.3f} across {len(baseline_scores)} tasks." - ) + baseline_desc = f"\nBASELINE: mean {mean_score:.3f} across {len(baseline_scores)} tasks." if baseline_side_info: needs_skill_ids = [] @@ -754,9 +704,7 @@ def build_agent_eval_background( behavioral = info.get("behavioral_scores", {}) for scorer_name, result in behavioral.items(): if result.get("value") == "no": - tool_issues.append( - f"{tid}: {scorer_name} - {result.get('rationale', '')[:80]}" - ) + tool_issues.append(f"{tid}: {scorer_name} - {result.get('rationale', '')[:80]}") if needs_skill_ids: baseline_desc += f"\n NEEDS_SKILL ({len(needs_skill_ids)} tasks): {', '.join(needs_skill_ids[:5])}" diff --git a/.test/src/skill_test/optimize/eval_criteria.py b/.test/src/skill_test/optimize/eval_criteria.py index 050d33ad..04242501 100644 --- a/.test/src/skill_test/optimize/eval_criteria.py +++ b/.test/src/skill_test/optimize/eval_criteria.py @@ -92,11 +92,7 @@ def filter_by_modules(self, tool_modules: list[str]) -> "EvalCriteriaSet": Criteria with empty ``applies_to`` are always included (general-purpose). """ - filtered = [ - s - for s in self.skills - if not s.applies_to or any(m in s.applies_to for m in tool_modules) - ] + filtered = [s for s in self.skills if not s.applies_to or any(m in s.applies_to for m in tool_modules)] result = EvalCriteriaSet.__new__(EvalCriteriaSet) result.skills = filtered result._by_name = {s.name: s for s in filtered} @@ -134,8 +130,7 @@ def _to_markdown(self) -> str: lines.append(f"- **{s.name}**: {s.description}") lines.append("") lines.append( - "Use the read_eval_criteria tool to load relevant criteria. " - "Use read_eval_reference for detailed rubrics." + "Use the read_eval_criteria tool to load relevant criteria. Use read_eval_reference for detailed rubrics." ) return "\n".join(lines) @@ -184,9 +179,7 @@ def discover_eval_criteria( if not base.is_dir(): logger.debug("Eval criteria directory not found: %s", base) return EvalCriteriaSet([]) - paths = sorted( - d for d in base.iterdir() if d.is_dir() and (d / "SKILL.md").exists() - ) + paths = sorted(d for d in base.iterdir() if d.is_dir() and (d / "SKILL.md").exists()) if paths: logger.info( "Discovered %d eval criteria: %s", diff --git a/.test/src/skill_test/optimize/judges.py b/.test/src/skill_test/optimize/judges.py index 66db622e..019bc16b 100644 --- a/.test/src/skill_test/optimize/judges.py +++ b/.test/src/skill_test/optimize/judges.py @@ -42,9 +42,7 @@ logger = logging.getLogger(__name__) -DEFAULT_JUDGE_LM = os.environ.get( - "GEPA_JUDGE_LM", "databricks:/databricks-claude-sonnet-4-6" -) +DEFAULT_JUDGE_LM = os.environ.get("GEPA_JUDGE_LM", "databricks:/databricks-claude-sonnet-4-6") # --------------------------------------------------------------------------- # Fallback model chain for rate limit errors @@ -126,9 +124,7 @@ def _to_litellm_model(model: str) -> tuple[str, str | None, str | None]: if gateway and model.startswith("databricks/"): # Route through AI Gateway as OpenAI-compatible endpoint endpoint_name = model.split("/", 1)[1] - api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get( - "DATABRICKS_API_KEY", "" - ) + api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get("DATABRICKS_API_KEY", "") return f"openai/{endpoint_name}", gateway, api_key or None return model, None, None @@ -156,9 +152,7 @@ def _judge_inference_params() -> dict[str, Any] | None: """Build inference_params for make_judge if AI Gateway is configured.""" gateway = _get_gateway_base_url() if gateway: - api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get( - "DATABRICKS_API_KEY", "" - ) + api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get("DATABRICKS_API_KEY", "") params: dict[str, Any] = {"base_url": gateway} if api_key: params["api_key"] = api_key @@ -180,9 +174,7 @@ def _to_judge_model_and_params(model: str) -> tuple[str, dict[str, Any] | None]: endpoint_name = model.split(":/", 1)[1] else: endpoint_name = model.split("/", 1)[1] - api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get( - "DATABRICKS_API_KEY", "" - ) + api_key = os.environ.get("DATABRICKS_TOKEN") or os.environ.get("DATABRICKS_API_KEY", "") params: dict[str, Any] = {"base_url": gateway} if api_key: params["api_key"] = api_key @@ -369,20 +361,12 @@ def create_correctness_judge( instructions = _CORRECTNESS_INSTRUCTIONS if skill_guidelines: # Filter for correctness-related guidelines - filtered = [ - g - for g in skill_guidelines - if any(kw in g.lower() for kw in _CORRECTNESS_KEYWORDS) - ] + filtered = [g for g in skill_guidelines if any(kw in g.lower() for kw in _CORRECTNESS_KEYWORDS)] if filtered: principles = "\n".join(f"- {g}" for g in filtered) - instructions += ( - f"\n\n## Domain-Specific Correctness Principles\n{principles}\n" - ) + instructions += f"\n\n## Domain-Specific Correctness Principles\n{principles}\n" - model_uri, inference_params = _to_judge_model_and_params( - judge_model or DEFAULT_JUDGE_LM - ) + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) return make_judge( name="skill_correctness", model=model_uri, @@ -442,9 +426,7 @@ def create_completeness_judge( Args: judge_model: LLM model for the judge. """ - model_uri, inference_params = _to_judge_model_and_params( - judge_model or DEFAULT_JUDGE_LM - ) + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) return make_judge( name="skill_completeness", model=model_uri, @@ -514,9 +496,7 @@ def create_guideline_adherence_judge( principles = "\n".join(f"- {g}" for g in skill_guidelines) instructions += f"\n\n## Required Guidelines\n{principles}\n" - model_uri, inference_params = _to_judge_model_and_params( - judge_model or DEFAULT_JUDGE_LM - ) + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) return make_judge( name="skill_guideline_adherence", model=model_uri, @@ -569,9 +549,7 @@ def create_regression_judge(judge_model: str | None = None) -> Any: judge_model: LLM model for the judge. Defaults to GEPA_JUDGE_LM env or databricks/databricks-claude-sonnet-4-6. """ - model_uri, inference_params = _to_judge_model_and_params( - judge_model or DEFAULT_JUDGE_LM - ) + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) return make_judge( name="skill_regression", model=model_uri, @@ -630,9 +608,7 @@ def _call_judge(j): fb = future.result(timeout=timeout) except concurrent.futures.TimeoutError: logger.warning("Judge '%s' timed out after %ds", name, timeout) - return JudgeFeedback( - value=0.0, rationale=f"Judge timed out after {timeout}s", name=name - ) + return JudgeFeedback(value=0.0, rationale=f"Judge timed out after {timeout}s", name=name) finally: # shutdown(wait=False) so a still-running judge thread doesn't block pool.shutdown(wait=False) @@ -643,9 +619,7 @@ def _call_judge(j): ) except concurrent.futures.TimeoutError: # Already handled above, but keep for safety - return JudgeFeedback( - value=0.0, rationale=f"Judge timed out after {timeout}s", name=name - ) + return JudgeFeedback(value=0.0, rationale=f"Judge timed out after {timeout}s", name=name) except Exception as e: pool.shutdown(wait=False) if not _is_rate_limit_error(e): @@ -680,9 +654,7 @@ def _call_judge(j): continue finally: fb_pool.shutdown(wait=False) - logger.info( - "Judge '%s' succeeded with fallback model '%s'", name, fallback_model - ) + logger.info("Judge '%s' succeeded with fallback model '%s'", name, fallback_model) return JudgeFeedback( value=fb.value, rationale=fb.rationale or "", @@ -690,9 +662,7 @@ def _call_judge(j): ) except Exception as fallback_err: if _is_rate_limit_error(fallback_err): - logger.warning( - "Fallback '%s' also rate limited, trying next", fallback_model - ) + logger.warning("Fallback '%s' also rate limited, trying next", fallback_model) continue logger.warning("Fallback '%s' failed: %s", fallback_model, fallback_err) continue @@ -838,25 +808,15 @@ def create_trace_correctness_judge( CLI flag, ``GEPA_JUDGE_LM`` env var, or default. """ criteria_block = eval_criteria.to_prompt(judge_model) if eval_criteria else "" - instructions = ( - _TRACE_CORRECTNESS_INSTRUCTIONS_PREFIX - + criteria_block - + _TRACE_CORRECTNESS_INSTRUCTIONS_BODY - ) + instructions = _TRACE_CORRECTNESS_INSTRUCTIONS_PREFIX + criteria_block + _TRACE_CORRECTNESS_INSTRUCTIONS_BODY if skill_guidelines: - filtered = [ - g - for g in skill_guidelines - if any(kw in g.lower() for kw in _CORRECTNESS_KEYWORDS) - ] + filtered = [g for g in skill_guidelines if any(kw in g.lower() for kw in _CORRECTNESS_KEYWORDS)] if filtered: principles = "\n".join(f"- {g}" for g in filtered) instructions += f"\n\n## Domain Correctness Principles\n{principles}\n" - model_uri, inference_params = _to_judge_model_and_params( - judge_model or DEFAULT_JUDGE_LM - ) + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) return make_judge( name="trace_correctness", model=model_uri, @@ -879,15 +839,9 @@ def create_trace_completeness_judge( judge_model: LLM model for the judge. """ criteria_block = eval_criteria.to_prompt(judge_model) if eval_criteria else "" - instructions = ( - _TRACE_COMPLETENESS_INSTRUCTIONS_PREFIX - + criteria_block - + _TRACE_COMPLETENESS_INSTRUCTIONS_BODY - ) + instructions = _TRACE_COMPLETENESS_INSTRUCTIONS_PREFIX + criteria_block + _TRACE_COMPLETENESS_INSTRUCTIONS_BODY - model_uri, inference_params = _to_judge_model_and_params( - judge_model or DEFAULT_JUDGE_LM - ) + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) return make_judge( name="trace_completeness", model=model_uri, @@ -914,19 +868,13 @@ def create_trace_guideline_judge( judge_model: LLM model for the judge. """ criteria_block = eval_criteria.to_prompt(judge_model) if eval_criteria else "" - instructions = ( - _TRACE_GUIDELINE_INSTRUCTIONS_PREFIX - + criteria_block - + _TRACE_GUIDELINE_INSTRUCTIONS_BODY - ) + instructions = _TRACE_GUIDELINE_INSTRUCTIONS_PREFIX + criteria_block + _TRACE_GUIDELINE_INSTRUCTIONS_BODY if skill_guidelines: principles = "\n".join(f"- {g}" for g in skill_guidelines) instructions += f"\n\n## Required Guidelines\n{principles}\n" - model_uri, inference_params = _to_judge_model_and_params( - judge_model or DEFAULT_JUDGE_LM - ) + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) return make_judge( name="trace_guideline_adherence", model=model_uri, diff --git a/.test/src/skill_test/optimize/runner.py b/.test/src/skill_test/optimize/runner.py index 75729e78..de97ad48 100644 --- a/.test/src/skill_test/optimize/runner.py +++ b/.test/src/skill_test/optimize/runner.py @@ -90,12 +90,8 @@ def _compute_diff_summary(original: str, optimized: str) -> str: if not diff: return "No changes" - added = sum( - 1 for line in diff if line.startswith("+") and not line.startswith("+++") - ) - removed = sum( - 1 for line in diff if line.startswith("-") and not line.startswith("---") - ) + added = sum(1 for line in diff if line.startswith("+") and not line.startswith("+++")) + removed = sum(1 for line in diff if line.startswith("-") and not line.startswith("---")) parts = [] if added: @@ -105,11 +101,7 @@ def _compute_diff_summary(original: str, optimized: str) -> str: changed_sections = set() for line in diff: - content = ( - line[1:].strip() - if line.startswith(("+", "-")) and not line.startswith(("+++", "---")) - else "" - ) + content = line[1:].strip() if line.startswith(("+", "-")) and not line.startswith(("+++", "---")) else "" if content.startswith("#"): changed_sections.add(content) @@ -121,9 +113,7 @@ def _compute_diff_summary(original: str, optimized: str) -> str: return summary -def _evaluate_on_tasks( - evaluator, candidate, tasks, label: str = "Evaluating", max_parallel: int = 1 -): +def _evaluate_on_tasks(evaluator, candidate, tasks, label: str = "Evaluating", max_parallel: int = 1): """Run evaluator on tasks and return mean score, per-task scores, and per-task side_info. Args: @@ -169,9 +159,7 @@ def _eval_task(idx, inst, task_id): try: for future in concurrent.futures.as_completed(futures, timeout=900): try: - idx, task_id, inst, score, side_info = future.result( - timeout=900 - ) + idx, task_id, inst, score, side_info = future.result(timeout=900) except Exception as e: idx = futures[future] task_id = tasks[idx].get("id", f"task_{idx}") @@ -204,16 +192,10 @@ def _eval_task(idx, inst, task_id): "scores": {"final": 0.0}, }, ) - side_info_by_input.setdefault( - inst.get("input", f"task_{idx}"), side_info_by_id[task_id] - ) + side_info_by_input.setdefault(inst.get("input", f"task_{idx}"), side_info_by_id[task_id]) future.cancel() - logger.warning( - "Task %s timed out in as_completed (900s)", task_id - ) - print( - f"\n WARNING: {label} timed out after 900s — scoring remaining tasks as 0.0" - ) + logger.warning("Task %s timed out in as_completed (900s)", task_id) + print(f"\n WARNING: {label} timed out after 900s — scoring remaining tasks as 0.0") pool.shutdown(wait=True) except Exception: pool.shutdown(wait=False) @@ -412,9 +394,7 @@ def optimize_skill( # Auto-derive AI Gateway URL from ANTHROPIC_BASE_URL if not explicitly set if not os.environ.get("DATABRICKS_AI_GATEWAY_URL"): - _anthropic_base = _agent_env.get("ANTHROPIC_BASE_URL", "") or os.environ.get( - "ANTHROPIC_BASE_URL", "" - ) + _anthropic_base = _agent_env.get("ANTHROPIC_BASE_URL", "") or os.environ.get("ANTHROPIC_BASE_URL", "") if "ai-gateway.cloud.databricks.com" in _anthropic_base: from urllib.parse import urlparse @@ -449,9 +429,7 @@ def optimize_skill( # Build read-only tool context string (for skill optimization) if tool_components: - tool_context_str = "\n\n".join( - tool_components[k] for k in sorted(tool_components) - ) + tool_context_str = "\n\n".join(tool_components[k] for k in sorted(tool_components)) # 2. Build seed_candidate (multi-component dict) seed_candidate: dict[str, str] = {} @@ -484,17 +462,11 @@ def optimize_skill( # 3. Load datasets if tools_only: # Cross-skill dataset for tool optimization - train = create_cross_skill_dataset( - max_per_skill=max_per_skill or 5, tool_modules=tool_modules - ) + train = create_cross_skill_dataset(max_per_skill=max_per_skill or 5, tool_modules=tool_modules) val = None if train: - source_skills = { - t.get("metadata", {}).get("source_skill", "?") for t in train - } - print( - f"Cross-skill dataset: {len(train)} tasks from {len(source_skills)} skill(s)" - ) + source_skills = {t.get("metadata", {}).get("source_skill", "?") for t in train} + print(f"Cross-skill dataset: {len(train)} tasks from {len(source_skills)} skill(s)") else: # Fall back to single-skill dataset try: @@ -526,9 +498,7 @@ def optimize_skill( if records: assessment_summary = summarize_assessment_patterns(records) assessment_by_task = match_assessments_to_tasks(records, train) - print( - f"MLflow assessments: {len(records)} traces, {len(assessment_by_task)} tasks matched" - ) + print(f"MLflow assessments: {len(records)} traces, {len(assessment_by_task)} tasks matched") if assessment_summary: print(f" {assessment_summary.splitlines()[0]}") else: @@ -546,9 +516,7 @@ def optimize_skill( print("Evaluator: skillbench (judge-driven)") if not effective_gen_model: - raise ValueError( - "SkillBench evaluator requires a gen_model. Pass --gen-model or set GEPA_GEN_LM env var." - ) + raise ValueError("SkillBench evaluator requires a gen_model. Pass --gen-model or set GEPA_GEN_LM env var.") evaluator = create_skillbench_evaluator( skill_name, gen_model=effective_gen_model, @@ -602,9 +570,7 @@ def optimize_skill( evaluator = agent_evaluator print("Mode: agent-eval-full (agent for ALL iterations)") else: - print( - "Mode: agent-eval hybrid (proxy for GEPA, agent for baseline + validation)" - ) + print("Mode: agent-eval hybrid (proxy for GEPA, agent for baseline + validation)") # Determine parallelism for evaluator calls (agent evaluator only) _eval_max_parallel = parallel_agents if agent_eval_full else 1 @@ -674,9 +640,7 @@ def _refiner_lm_with_fallback(prompt): for comp, tokens in original_token_counts.items(): print(f" {comp}: {tokens:,} tokens") if tool_context_str: - print( - f"Tool context (read-only): {count_tokens(tool_context_str):,} tokens" - ) + print(f"Tool context (read-only): {count_tokens(tool_context_str):,} tokens") print(f"Train tasks: {len(train)}") print(f"Val tasks: {len(val) if val else 'None (single-task mode)'}") print(f"Generation model: {effective_gen_model}") @@ -718,14 +682,12 @@ def _refiner_lm_with_fallback(prompt): dry_run_agent_si = None if agent_evaluator: print(f"\nAgent baseline ({len(train)} tasks)...") - dry_run_agent_score, agent_per_task, dry_run_agent_si, _ = ( - _evaluate_on_tasks( - agent_evaluator, - seed_candidate, - train, - label="Agent baseline", - max_parallel=parallel_agents, - ) + dry_run_agent_score, agent_per_task, dry_run_agent_si, _ = _evaluate_on_tasks( + agent_evaluator, + seed_candidate, + train, + label="Agent baseline", + max_parallel=parallel_agents, ) print(f"Agent baseline score: {dry_run_agent_score:.3f}") for task_id, score in agent_per_task.items(): @@ -823,14 +785,12 @@ def _refiner_lm_with_fallback(prompt): # 6b. Agent baseline scoring (hybrid mode: before GEPA loop) if agent_evaluator and not agent_eval_full: print(f"\n Agent baseline scoring ({len(train)} tasks)...") - agent_baseline_score, agent_baseline_per_task, agent_baseline_si, _ = ( - _evaluate_on_tasks( - agent_evaluator, - seed_candidate, - train, - label="Agent baseline", - max_parallel=parallel_agents, - ) + agent_baseline_score, agent_baseline_per_task, agent_baseline_si, _ = _evaluate_on_tasks( + agent_evaluator, + seed_candidate, + train, + label="Agent baseline", + max_parallel=parallel_agents, ) print(f" Agent baseline score: {agent_baseline_score:.3f}") for task_id, score in agent_baseline_per_task.items(): @@ -855,11 +815,7 @@ def _refiner_lm_with_fallback(prompt): ) # estimate_pass_duration expects the model name string, not the callable - _est_reflection_lm = ( - _reflection_model_name - if _reflection_model_name - else str(reflection_lm or DEFAULT_GEN_LM) - ) + _est_reflection_lm = _reflection_model_name if _reflection_model_name else str(reflection_lm or DEFAULT_GEN_LM) est_secs = estimate_pass_duration( config.engine.max_metric_calls, _est_reflection_lm, @@ -874,9 +830,7 @@ def _refiner_lm_with_fallback(prompt): ) for pass_num in range(1, max_passes + 1): - print( - f"\n --- Pass {pass_num}/{max_passes} (best score so far: {best_score:.4f}) ---" - ) + print(f"\n --- Pass {pass_num}/{max_passes} (best score so far: {best_score:.4f}) ---") pass_config = copy.deepcopy(config) @@ -905,9 +859,7 @@ def _refiner_lm_with_fallback(prompt): ) improvement = pass_score - best_score - print( - f" Pass {pass_num} score: {pass_score:.4f} (delta: {'+' if improvement >= 0 else ''}{improvement:.4f})" - ) + print(f" Pass {pass_num} score: {pass_score:.4f} (delta: {'+' if improvement >= 0 else ''}{improvement:.4f})") if pass_score > best_score + improvement_threshold: best = dict(candidate) @@ -956,14 +908,12 @@ def _refiner_lm_with_fallback(prompt): if agent_evaluator and not agent_eval_full: print(f"\n Agent validation scoring ({len(train)} tasks on best candidate)...") - agent_validation_score, agent_val_per_task, agent_validation_si, _ = ( - _evaluate_on_tasks( - agent_evaluator, - best, - train, - label="Agent validation", - max_parallel=parallel_agents, - ) + agent_validation_score, agent_val_per_task, agent_validation_si, _ = _evaluate_on_tasks( + agent_evaluator, + best, + train, + label="Agent validation", + max_parallel=parallel_agents, ) print(f" Agent validation score: {agent_validation_score:.3f}") for task_id, score in agent_val_per_task.items(): From 53ed7bdaab2b39d4fb830fec26705d44a94fa8ef Mon Sep 17 00:00:00 2001 From: calreynolds Date: Mon, 16 Mar 2026 16:49:12 -0400 Subject: [PATCH 6/6] style: format test_sql_output_format.py from merged PR #297 Co-authored-by: Isaac --- databricks-mcp-server/tests/test_sql_output_format.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/databricks-mcp-server/tests/test_sql_output_format.py b/databricks-mcp-server/tests/test_sql_output_format.py index 5dadb5de..9b678abd 100644 --- a/databricks-mcp-server/tests/test_sql_output_format.py +++ b/databricks-mcp-server/tests/test_sql_output_format.py @@ -73,6 +73,4 @@ def test_markdown_smaller_than_json(self): md = _format_results_markdown(rows) js = json.dumps(rows) # Markdown should be at least 30% smaller - assert len(md) < len(js) * 0.7, ( - f"Markdown ({len(md)} chars) should be <70% of JSON ({len(js)} chars)" - ) + assert len(md) < len(js) * 0.7, f"Markdown ({len(md)} chars) should be <70% of JSON ({len(js)} chars)"