refactor: Replace ProcessPoolExecutor with asyncio for evaluation#446
refactor: Replace ProcessPoolExecutor with asyncio for evaluation#446simonrosenberg wants to merge 14 commits intomainfrom
Conversation
This refactors the evaluation orchestrator from using ProcessPoolExecutor to asyncio with semaphore-based concurrency. This eliminates the 30× memory multiplication from having 30 worker processes, which was causing OOM failures in long-running SWTBench evaluations. Changes: - Use asyncio.Semaphore to limit concurrent instances - Run sync SDK operations (workspace, conversation) via asyncio.to_thread() - Single-process async concurrency for I/O-bound workload - Update tests for new architecture - Add pytest-asyncio for async test support The workload is I/O-bound (HTTP calls to LLM proxy + runtime API), so single-process async concurrency provides the same parallelism without the memory overhead of separate Python processes. Fixes #441 Co-authored-by: openhands <openhands@all-hands.dev>
The asyncio refactor runs sync SDK operations in thread workers via
asyncio.to_thread(). Two thread-safety bugs caused crashes:
1. redirect_stdout_stderr() replaced the global sys.stdout with a log
file and restored it in a finally block. When multiple threads ran
concurrently, one thread closing its log file would crash others
still writing to it ("I/O operation on closed file").
Fix: Use threading.local() to store per-thread log files and a
_ThreadLocalWriter wrapper that delegates writes to the correct
file for each thread. The wrapper also catches ValueError and
falls back to the original stream.
2. os.environ["LMNR_SPAN_CONTEXT"] was written without
synchronisation, allowing one thread to overwrite another's span
context before it was consumed.
Fix: Protect the write with a threading.Lock().
Also: remove duplicate @pytest.mark.asyncio decorator, remove unused
sys import, run ruff format.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three bugs caused 275/433 instances to produce no results: 1. Timeout starts at task creation, not semaphore acquisition (204 lost) All 433 asyncio tasks are created at once but only 30 run concurrently (semaphore). Instances queued behind the semaphore burned through their 4h timeout while waiting. Fix: Reset start_time to time.monotonic() inside the semaphore context, so the timeout counts from when the instance actually begins running. 2. lmnr_span.end() raises ValueError in worker threads (37 lost) Laminar spans use contextvars which don't work across threads. lmnr_span.end() in the finally block raised "Failed to detach context" ValueError, crashing the thread and discarding the return value (including error outputs from exhausted retries). Fix: Wrap lmnr_span.end() in try/except. 3. Generic Exception handler drops instances silently (exacerbates #2) When task.result() raised from bug #2, the except handler logged the error but did not create an error output or call on_result. Fix: Create error_output and call on_result in the generic Exception handler. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
setup_instance_logging() modified the root logger's handlers per-call, removing all existing handlers and adding new instance-specific ones. In a multi-threaded environment (asyncio.to_thread workers), thread B would remove thread A's file handler, causing thread A's logs to go to thread B's file or be lost entirely. Fix: Install a single pair of routing handlers on the root logger that delegate to per-thread state via threading.local(): - _ThreadRoutedFileHandler: routes records to the FileHandler stored in the current thread's _logging_local.file_handler - _ThreadRoutedConsoleHandler: applies the formatter/filter stored in the current thread's _logging_local for console output The routing handlers are installed once (protected by a lock), and subsequent calls to setup_instance_logging() only update the thread-local state. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Set start_time=inf at task creation so instances waiting in semaphore queue can't timeout before they start running - Add fallback in thread-routed logging handlers so main asyncio thread warnings/errors are written to stderr/stdout instead of silently dropped - Retry instances with no output at all in previous attempt, not just critic-flagged failures Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve conflicts: keep asyncio evaluation.py and console_logging.py from feature branch, take main's versions for all other files. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
09c9fb1 to
863df2a
Compare
Resolve conflicts by keeping asyncio architecture from feature branch while incorporating improvements from main: - Update Laminar API to two-phase datapoint linking (create datapoint immediately for UI, link trace when worker starts) - Add _cleanup_workspace helper method - Add _execute_single_attempt method for cleaner retry logic - Fix _get_instances_for_attempt to retry instances with no output in any prior attempt (never_completed set) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The _ThreadRoutedConsoleHandler was writing WARNING+ level messages to sys.__stdout__ when no thread-local formatter was set (for the main asyncio event loop thread). This caused OpenTelemetry context detachment errors to be written to stdout, corrupting the JSON output that shell scripts parse with jq. Changed the fallback to write to sys.__stderr__ instead, matching the behavior of _ThreadRoutedFileHandler. This fixes the "jq: parse error: Invalid numeric literal" failures in gaia, swebench, and swtbench benchmarks. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
This looks interesting! testing it with GLM-5 and swebenchmultimodal that has has some deadlocks.https://github.com/OpenHands/software-agent-sdk/actions/runs/22772705315 |
juanmichelini
left a comment
There was a problem hiding this comment.
Tested it with swebenchmultimodal GLM-5 (which works on main)
and it consistently gives the same error below.
Can you add links to successful runs?
Evaluation Failed
Evaluation: 22906539712-glm-5
Model: litellm_proxy/openrouter/z-ai/glm-5
Benchmark: swebenchmultimodal
Exit Code: 5
Log Tail:
File "/workspace/benchmarks/vendor/software-agent-sdk/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py", line 1031, in _handle_conversation_status
raise ConversationRunError(
openhands.sdk.conversation.exceptions.ConversationRunError: Conversation run failed for id=53b37d6f-d74f-48f6-93e1-2e70f68e0444: Remote conversation ended with error
2026-03-11 20:27:11,448 - ERROR - opentelemetry.context - Failed to detach context
Traceback (most recent call last):
File "/workspace/benchmarks/.venv/lib/python3.12/site-packages/opentelemetry/context/init.py", line 155, in detach
_RUNTIME_CONTEXT.detach(token)
File "/workspace/benchmarks/.venv/lib/python3.12/site-packages/opentelemetry/context/contextvars_context.py", line 53, in detach
self._current_context.reset(token)
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x7ebd72de98f0> at 0x7ebd077059c0> was created in a different Context
ERROR: opentelemetry.context: Failed to detach context
2026-03-11 20:27:11,448 - ERROR - opentelemetry.context - Failed to detach context
Traceback (most recent call last):
File "/workspace/benchmarks/.venv/lib/python3.12/site-packages/opentelemetry/context/init.py", line 155, in detach
_RUNTIME_CONTEXT.detach(token)
File "/workspace/benchmarks/.venv/lib/python3.12/site-packages/opentelemetry/context/contextvars_context.py", line 53, in detach
self._current_context.reset(token)
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x7ebd72de98f0> at 0x7ebd07510940> was created in a different Context
ERROR: opentelemetry.context: Failed to detach context
jq: parse error: Invalid numeric literal at line 1, column 4
Root cause of swebenchmultimodal jq parse error (exit code 5): _ThreadRoutedConsoleHandler wrote formatted log messages to sys.__stdout__, which got captured by shell $() substitution. OpenTelemetry context-detach errors appearing after JSON output corrupted the stream that run_swebenchmultimodal.sh pipes to jq. Changes: - Route ALL console handler output to sys.__stderr__ (matches main's approach from PR #497) to protect stdout for JSON output parsing - Suppress opentelemetry.context logger (CRITICAL level) to prevent harmless context-detach errors from leaking to any output stream - Extract setup_routed_logging() to set up main-thread logging defaults explicitly, eliminating fallback paths in handlers - Change PendingInstance.start_time to Optional[float] (None while queued) instead of float("inf") sentinel value - Replace _ThreadLocalWriter.__getattr__ with explicit properties (encoding, closed, isatty, fileno) - Add integration tests: test_evaluation_run_end_to_end (4 instances with 1 failure through full Evaluation.run()) and test_evaluation_timeout_cancels_instance - Fix test_workspace_cleanup.py to use _process_one_sync (renamed from _process_one_mp in asyncio refactor) - Add pytest-asyncio to dev dependencies with asyncio_mode = "auto" - Update SDK submodule to latest main - Rebase onto latest main Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
a2d9ac3 to
591b05b
Compare
…t module Replace scattered thread-safety infrastructure (two threading.local() instances, three setup functions across two files) with a single worker_context.py module exposing initialize() and instance_context(). - Create benchmarks/utils/worker_context.py with unified _ctx threading.local - Strip thread-routing code from console_logging.py (~175 lines removed) - Simplify evaluation.py by replacing 3 setup calls with one context manager - Update test_keyboard_interrupt to check threads instead of child processes - Auto-format files to satisfy pre-commit Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
all-hands-bot
left a comment
There was a problem hiding this comment.
Taste Rating: 🟡 Acceptable
The core refactor addresses a real production problem (30× memory from worker processes causing OOM). The asyncio approach is fundamentally sound. However, the solution trades process memory for code complexity in the logging infrastructure. The new worker_context.py module adds significant indirection that could be simpler.
Positive: Integration tests were added since the last review, addressing the critical testing gap.
Concern: We're routing everything through thread-local getattr() chains and swallowing exceptions silently in multiple places. This feels like fighting Python's threading model rather than working with it.
| stores its own FileHandler in _ctx.file_handler, and this handler | ||
| delegates to whichever FileHandler the current thread has. | ||
| """ | ||
|
|
There was a problem hiding this comment.
🟠 Important - Indirection Chain: This handler uses getattr(_ctx, "file_handler", None) to route to per-thread handlers. Three separate routing classes (_RoutedFileHandler, _RoutedConsoleHandler, _ThreadLocalWriter) all follow this pattern.
Question: Is thread-local routing actually necessary here? Could we instead pass explicit logger/file objects to the worker function? The current design requires understanding contextvars, threading.local(), and the routing logic to debug logging issues.
Alternative approach: Pass a LogContext object explicitly to each worker thread instead of relying on thread-local magic. Makes data flow explicit.
There was a problem hiding this comment.
Thread-local routing is necessary here because:
- Third-party code logs too — the SDK, OpenTelemetry, litellm, and other deps all call
logging.getLogger()and get the same root logger. We can't pass explicit loggers to those libraries. - stdout/stderr are process-global —
print()calls from SDK code, tqdm, and third-party libs write tosys.stdout. The only way to route per-thread is to replacesys.stdoutwith a thread-local writer.
An explicit LogContext object would work for our own code, but wouldn't capture output from the ~20+ transitive dependencies that also log/print during evaluation. The thread-local approach is the standard pattern for this (same as what Django uses for locale.activate() per-request).
That said, the consolidation into a single _ctx with one module makes the routing much easier to follow than the previous scattered implementation.
| return | ||
| record_msg = self.format(record) | ||
| try: | ||
| fh.stream.write(record_msg + "\n") |
There was a problem hiding this comment.
🔴 Critical - Silent Failure: except Exception: pass swallows ALL exceptions during file write. If the file handler is closed, disk is full, or permissions changed, logs silently disappear.
Better approach:
try:
fh.stream.write(record_msg + "\n")
fh.stream.flush()
except (OSError, ValueError) as e:
# Log to stderr as fallback so we know logging failed
sys.__stderr__.write(f"[LOGGING FAILED] {record_msg}\n")Logging infrastructure failures should not be invisible.
| # --- file-like API used by print() and the logging module --------------- | ||
|
|
||
| def write(self, s: str) -> int: | ||
| target = self._target() |
There was a problem hiding this comment.
🟡 Suggestion - Exception Handling: ValueError catch here handles "I/O operation on closed file" but the fallback writes to self._original which could also be closed.
try:
return target.write(s)
except (ValueError, OSError):
# If both streams fail, at least try stderr
try:
return self._original.write(s)
except Exception:
sys.__stderr__.write(s) # Last resort
benchmarks/utils/evaluation.py
Outdated
|
|
||
| instance: EvalInstance | ||
| start_time: float | ||
| start_time: Optional[float] = None |
There was a problem hiding this comment.
🟡 Suggestion - Sentinel Pattern: Using start_time: Optional[float] = None as a sentinel for "not started yet" was flagged in previous review. The code now has to check if st is not None before timeout calculation (line 587).
Cleaner approach: Don't track start_time in PendingInstance at all. Instead, store it in a separate dict only when the task actually starts:
active_start_times: dict[asyncio.Task, float] = {}
# When semaphore acquired:
active_start_times[task] = time.monotonic()
# Timeout check:
for task in pending:
start = active_start_times.get(task)
if start and now - start > timeout:
timed_out.append(task)Eliminates the None check entirely ("good taste" - no special case).
benchmarks/utils/evaluation.py
Outdated
| if eval_span is not None: | ||
| eval_span.end() | ||
| try: | ||
| eval_span.end() |
There was a problem hiding this comment.
🟠 Important - Silent Exception: Bare except Exception: pass with only a comment explaining why.
Issue: If span.end() fails for a DIFFERENT reason (network error, SDK bug), we'll never know. The comment says "contextvars tokens created in main thread cannot be detached from worker thread" but we're catching ALL exceptions.
try:
eval_span.end()
except RuntimeError as e:
# Expected: contextvars token detach from worker thread
if "context" not in str(e).lower():
logger.warning(f"Unexpected span.end() error: {e}")
except Exception as e:
logger.warning(f"Unexpected span.end() error: {e}")Catch the specific error we expect, log unexpected ones.
| Creates a TestEvaluation with mock workspaces and verifies that: | ||
| - Multiple instances run concurrently via asyncio | ||
| - Results are collected and written to attempt files | ||
| - Errors produce error outputs (not lost instances) |
There was a problem hiding this comment.
🟢 Acceptable - Integration Test Added: This test creates a real Evaluation instance, runs multiple instances through the async path, verifies error handling, and checks output files.
This addresses the critical "Missing Integration Test" feedback from the previous review. The test exercises the actual evaluation flow, not just asyncio primitives.
Also good: test_evaluation_timeout_cancels_instance verifies timeout behavior end-to-end.
| # instance actually begins running, not from when it was queued. | ||
| task = asyncio.current_task() | ||
| if task and task in pending_instances: | ||
| pending_instances[task].start_time = time.monotonic() |
There was a problem hiding this comment.
🟡 Suggestion - Design Smell: The comment "Reset start_time to NOW so timeout counts from when instance actually begins running" reveals we're working around the data structure instead of with it.
This is the continuation of the sentinel pattern issue. We're storing start_time in PendingInstance before the instance starts (set to None), then updating it later.
Root cause: PendingInstance is created before the task starts but needs different state after it starts. Consider splitting into two types:
@dataclass
class QueuedInstance:
instance: EvalInstance
task: asyncio.Task
datapoint_id: UUID | None
@dataclass
class RunningInstance(QueuedInstance):
start_time: float # Always set, never NoneMove from queued → running when semaphore acquired. No None checks needed.
- Narrow bare `except Exception` to `except (OSError, ValueError)` in logging handlers with stderr fallback when file handler fails - Add sys.__stderr__ as last-resort fallback in _ThreadLocalWriter - Replace PendingInstance.start_time sentinel with separate active_start_times dict — eliminates None checks in timeout logic - Narrow span.end() except to RuntimeError (expected contextvars error), log unexpected errors instead of silently swallowing them Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
all-hands-bot
left a comment
There was a problem hiding this comment.
Taste Rating: 🔴 Blocking Issues + Over-Engineering
The core refactor (ProcessPoolExecutor → asyncio) solves a real production problem (30× memory multiplication causing OOM). The approach is theoretically sound for I/O-bound workloads.
However:
🔴 BLOCKING: Runtime Failures Reported
@juanmichelini reported consistent failures with GLM-5 that works on main:
"Tested it with swebenchmultimodal GLM-5 (which works on main) and it consistently gives the same error"
This must be debugged and fixed before merge. The PR description claims successful evaluation but provides no evidence (no links to successful runs, no logs proving GLM-5 works).
Required: Add an Evidence section to the PR description with:
- Links to successful evaluation runs (especially GLM-5)
- Complete logs showing the async refactor works end-to-end
- Comparison with main branch results to prove no regression
🔴 Critical: Silent Exception Handling Masks Bugs
benchmarks/utils/worker_context.py:63 - _RoutedFileHandler.emit():
except (OSError, ValueError):
# fall back to stderr so the message isn't silently lost.
if sys.__stderr__:
try:
sys.__stderr__.write(record_msg + "\n")
except Exception:
passProblem: If file writes fail (closed file, disk full), you fall back to stderr, then if THAT fails, you silently pass. The actual error is lost. This could be hiding the root cause of the GLM-5 failures.
Fix: Don't silently swallow exceptions. At minimum, log the failure and raise so we know something is broken.
benchmarks/utils/evaluation.py:762-770 and :899-910 - Span cleanup:
except RuntimeError:
# Expected: contextvars tokens created in the main
# thread cannot be detached from a worker thread.
pass
except Exception as e:
logger.warning("[worker] Unexpected span.end() error: %s", e)Problem: You're catching ALL RuntimeErrors assuming they're the "expected" contextvars issue. What if a different RuntimeError occurs that indicates a real bug?
Fix: Check the error message before ignoring:
except RuntimeError as e:
if "cannot be detached" not in str(e):
logger.error(f"[worker] Unexpected RuntimeError: {e}")
raise
logger.debug(f"[worker] Ignored expected contextvars error")🟡 Architecture: Over-Engineered Logging Infrastructure
benchmarks/utils/worker_context.py - 330 lines of thread-routing machinery:
_RoutedFileHandlerwithgetattr(_ctx, "file_handler", None)routing_RoutedConsoleHandlerwithgetattr(_ctx, "console_formatter", None)routing_ThreadLocalWriterwithgetattr(_ctx, "log_file", None)routing- Three layers of fallback exception handling
- Global initialization guard with locking
This is too much machinery for "each thread needs its own log file."
Fundamental question: Why do we need thread-local routing at all? Each thread could just open its own log file directly and configure its own logger. No routing. No threading.local(). No getattr() magic.
If third-party code (SDK, OpenTelemetry) MUST use the root logger and you can't pass explicit loggers, then document WHY this complexity is unavoidable. Right now it looks like we're solving a problem we created.
"If you need more than 3 levels of indirection, you're doing it wrong." - This has multiple levels of getattr-based routing with fallback chains. That's a code smell.
🟠 Important: Fighting the Framework
The repeated except RuntimeError for "contextvars tokens cannot be detached from worker threads" indicates we're fighting Python's design.
This is a warning sign. If OpenTelemetry spans + contextvars don't work naturally with threads, maybe we're using the wrong concurrency model.
Alternatives to consider:
- Does OpenTelemetry have a thread-safe span API?
- Could we use ProcessPoolExecutor with worker recycling (solves memory issue without fighting contextvars)?
- Is the benefit of asyncio worth these workarounds?
🟡 Testing: Coverage Weakened
Good: test_async_evaluation.py includes integration tests that exercise real Evaluation instances.
Problem: Tests use mocks for workspace/LaminarService. They prove asyncio flow works, but not that it integrates correctly with the real SDK.
Given the reported GLM-5 failures, the mocked tests clearly aren't catching integration issues.
test_keyboard_interrupt.py: The old test verified worker processes were cleaned up. The new test just checks "process exits." Lost coverage: we don't verify cleanup behavior or that threads actually started.
Verdict: ❌ Needs Rework
Cannot merge until:
- ✅ Debug and fix the GLM-5 failures reported by @juanmichelini
- ✅ Add evidence section with successful run links
- ✅ Fix silent exception handling (at minimum, check error messages before ignoring)
- 💭 Consider simplifying the logging infrastructure (current approach is over-engineered)
Key Insight: You're trading 30× memory for code complexity and fighting Python's concurrency primitives. The core idea is sound, but the implementation has too many workarounds and hidden failure modes.
Addresses review feedback from #446: 1. Simplified worker_context.py from 332 to 213 lines (~36% reduction) - Renamed classes to simpler names (_FileHandler, _ConsoleHandler) - Removed fallback exception chains that swallowed errors silently - Replaced __getattr__ magic with explicit properties in _StdoutWrapper - Added docstring explaining WHY thread-local routing is necessary (third-party code uses root logger and sys.stdout) 2. Fixed silent exception handling in evaluation.py - span.end() RuntimeError now checks error message before ignoring - Only ignores contextvars-related errors, logs all others - Distinguishes eval_span vs exec_span in log messages 3. Simplified instance_context() - Removed state restoration logic (cleanup just deletes attrs) - More linear flow, fewer special cases Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
all-hands-bot
left a comment
There was a problem hiding this comment.
Taste Rating: 🟡 Acceptable with Critical Issues
The core refactor (ProcessPoolExecutor → asyncio) solves a real production problem (30× memory multiplication). However, there are blocking issues that must be addressed:
- Silent exception swallowing that could hide bugs
- No runtime evidence despite reported failures
- Logging infrastructure complexity that violates simplicity principles
benchmarks/utils/evaluation.py
Outdated
| # Contextvars tokens from main thread can't detach in worker. | ||
| # Only ignore this specific error, log others. | ||
| if "context" not in str(e).lower(): | ||
| logger.warning("[worker] span.end() RuntimeError: %s", e) |
There was a problem hiding this comment.
🔴 Critical - Silent Exception Swallowing: This catches RuntimeError and only ignores the specific "context" error, but the fallback except Exception is too broad.
Issue: If span.end() fails for ANY other reason (network error, SDK bug, file system issue), you'll never know. The comment explains ONE failure mode, but the code silently swallows ALL exceptions.
Fix: Log unexpected exceptions at minimum:
| logger.warning("[worker] span.end() RuntimeError: %s", e) | |
| try: | |
| eval_span.end() | |
| except RuntimeError as e: | |
| # Contextvars tokens from main thread can't detach in worker. | |
| if "context" not in str(e).lower(): | |
| logger.error("[worker] span.end() RuntimeError: %s", e, exc_info=True) | |
| except Exception as e: | |
| logger.error("[worker] span.end() unexpected error: %s", e, exc_info=True) |
This is not over-engineering - it's basic observability. Silent failures make debugging impossible.
| logger.warning("[worker] exec_span.end() error: %s", e) | ||
|
|
||
| yield | ||
|
|
There was a problem hiding this comment.
🔴 Critical - Same Silent Swallowing Pattern: Duplicate of the issue above. If exec_span.end() fails for a DIFFERENT reason than the contextvars issue, you won't know.
| try: | |
| exec_span.end() | |
| except RuntimeError as e: | |
| if "context" not in str(e).lower(): | |
| logger.error("[worker] exec_span.end() RuntimeError: %s", e, exc_info=True) | |
| except Exception as e: | |
| logger.error("[worker] exec_span.end() unexpected error: %s", e, exc_info=True) |
| fh.stream.flush() | ||
| except (OSError, ValueError) as e: | ||
| # Write failed — log to stderr so we know something is wrong | ||
| if sys.__stderr__: |
There was a problem hiding this comment.
🟠 Important - Fallback Still Fails: The except block handles (OSError, ValueError) but writes to sys.__stderr__ which could ALSO be None or closed.
Better approach - fail fast if logging is broken:
| if sys.__stderr__: | |
| try: | |
| fh.stream.write(msg + "\n") | |
| fh.stream.flush() | |
| except Exception as e: | |
| # Last-resort: try stderr, but don't swallow if both fail | |
| if sys.__stderr__: | |
| try: | |
| sys.__stderr__.write(f"[logging failed: {e}] {msg}\n") | |
| except Exception: | |
| pass # Nothing left to do |
If logging is completely broken, we want to know, not silently lose logs.
benchmarks/utils/worker_context.py
Outdated
| Explicit properties instead of __getattr__ to avoid hiding bugs. | ||
| """ | ||
|
|
||
| def __init__(self, original: TextIO) -> None: | ||
| self._original = original | ||
|
|
||
| def _target(self) -> TextIO: | ||
| return getattr(_ctx, "output_file", None) or self._original | ||
|
|
||
| def write(self, s: str) -> int: | ||
| return self._target().write(s) | ||
|
|
||
| def flush(self) -> None: | ||
| self._target().flush() | ||
|
|
||
| @property | ||
| def encoding(self) -> str: | ||
| return self._original.encoding | ||
|
|
||
| @property | ||
| def closed(self) -> bool: | ||
| return self._original.closed | ||
|
|
There was a problem hiding this comment.
🟡 Suggestion - Over-Engineered Forwarding: You've implemented explicit properties for encoding, closed, isatty(), and fileno() to "avoid hiding bugs" per the comment on line 98.
Question: Which code actually calls these? If nothing uses isatty() or fileno() on the wrapped streams, this is premature.
Simpler approach - Only implement what's actually needed. If tqdm or SDK code breaks, THEN add the specific property. This is 20 lines that might not solve any real problem.
benchmarks/utils/worker_context.py
Outdated
| _ctx.console_level = logging.WARNING | ||
|
|
||
|
|
||
| @contextmanager | ||
| def instance_context(log_dir: str, instance_id: str) -> Generator[None, None, None]: | ||
| """Set up per-instance logging and output redirection for this thread. | ||
|
|
||
| Args: | ||
| log_dir: Directory for log files | ||
| instance_id: The evaluation instance ID | ||
| """ | ||
| initialize() | ||
|
|
||
| log_path = os.path.join(log_dir, f"instance_{instance_id}.log") | ||
| output_path = os.path.join(log_dir, f"instance_{instance_id}.output.log") | ||
| short_id = ( | ||
| instance_id.split("__")[-1][:20] if "__" in instance_id else instance_id[:20] | ||
| ) | ||
| rich_mode = _rich_logging_enabled() | ||
|
|
||
| os.makedirs(log_dir, exist_ok=True) | ||
| log_file = open(log_path, "a", encoding="utf-8") # noqa: SIM115 | ||
| output_file = open(output_path, "a", buffering=1, encoding="utf-8") # noqa: SIM115 | ||
|
|
||
| # Set thread-local state | ||
| _ctx.file_handler = logging.FileHandler(log_path) | ||
| _ctx.output_file = output_file | ||
| if rich_mode: | ||
| _ctx.console_formatter = _ColorFormatter(instance_id) | ||
| _ctx.console_filter = _ConsoleFilter() | ||
| _ctx.console_level = logging.INFO | ||
| else: | ||
| _ctx.console_formatter = _PlainFormatter(instance_id) | ||
| _ctx.console_filter = None | ||
| _ctx.console_level = logging.WARNING | ||
|
|
||
| # Print startup message | ||
| if rich_mode: | ||
| if sys.__stderr__: | ||
| sys.__stderr__.write( | ||
| format_line( | ||
| short_id=short_id, | ||
| tag="START", | ||
| message=f"{instance_id} | Logs: {log_path}", | ||
| tag_bg=BG_BLUE, | ||
| message_color=CYAN_BRIGHT, | ||
| newline_before=True, | ||
| ) | ||
| + "\n" | ||
| ) | ||
| sys.__stderr__.flush() | ||
| else: | ||
| # Temporarily allow INFO for startup message | ||
| _ctx.console_level = logging.INFO | ||
| logging.getLogger().info( | ||
| f"=== Evaluation Started (instance {instance_id}) ===\n" | ||
| f" • tail -f {log_path} (logger)\n" | ||
| f" • tail -f {output_path} (stdout/stderr)" | ||
| ) | ||
| _ctx.console_level = logging.WARNING | ||
|
|
||
| try: | ||
| yield | ||
| finally: | ||
| # Clear thread-local state | ||
| for attr in ("file_handler", "output_file", "console_formatter", "console_filter"): | ||
| if hasattr(_ctx, attr): | ||
| delattr(_ctx, attr) | ||
| log_file.close() | ||
| output_file.close() |
There was a problem hiding this comment.
🟡 Suggestion - Complex Setup/Teardown: The instance_context() function does:
- Initialize logging (if needed)
- Open two files
- Set 4+ thread-local attributes
- Print startup message with rich/plain branching
- Temporarily change console_level for startup
- Clean up all thread-local attrs
- Close files
This is a lot of moving parts. The comment on line 7 says "thread-local routing is necessary" because third-party code logs to root logger. That's true, but the implementation has 4 routing classes + thread-local state + dual-mode formatters.
Not blocking, but this is the kind of complexity that creates bugs. If a file fails to close, does thread-local cleanup still happen? (Answer: yes, but only because finally block clears attrs - easy to miss)
|
|
||
|
|
||
| def test_evaluation_run_end_to_end(tmp_path): | ||
| """Integration test: run a real Evaluation instance through the async path. | ||
| Creates a TestEvaluation with mock workspaces and verifies that: | ||
| - Multiple instances run concurrently via asyncio | ||
| - Results are collected and written to attempt files | ||
| - Errors produce error outputs (not lost instances) | ||
| - Thread-safe logging is properly set up | ||
| """ | ||
| from typing import List | ||
| from unittest.mock import Mock, patch | ||
|
|
||
| from benchmarks.utils.evaluation import Evaluation | ||
| from benchmarks.utils.models import EvalInstance, EvalMetadata, EvalOutput | ||
| from openhands.sdk import LLM | ||
| from openhands.sdk.critic import PassCritic | ||
|
|
||
| instances = [EvalInstance(id=f"inst_{i}", data={"idx": i}) for i in range(4)] | ||
| # Instance 2 will fail | ||
| fail_ids = {"inst_2"} | ||
|
|
||
| class TestEvaluation(Evaluation): | ||
| def prepare_instances(self) -> List[EvalInstance]: | ||
| return instances | ||
|
|
||
| def prepare_workspace(self, instance, resource_factor=1, forward_env=None): | ||
| ws = Mock() | ||
| ws.__exit__ = Mock() | ||
| return ws | ||
|
|
||
| def evaluate_instance(self, instance, workspace): | ||
| if instance.id in fail_ids: | ||
| raise RuntimeError(f"Simulated failure for {instance.id}") | ||
| return EvalOutput( | ||
| instance_id=instance.id, | ||
| test_result={"ok": True}, | ||
| instruction="test", | ||
| error=None, | ||
| history=[], | ||
| instance=instance.data, | ||
| ) | ||
|
|
||
| llm = LLM(model="test-model") | ||
| metadata = EvalMetadata( | ||
| llm=llm, | ||
| dataset="test", | ||
| dataset_split="test", | ||
| max_iterations=10, | ||
| eval_output_dir=str(tmp_path), | ||
| details={}, | ||
| eval_limit=4, | ||
| max_attempts=1, | ||
| max_retries=0, | ||
| critic=PassCritic(), | ||
| ) | ||
|
|
||
| evaluator = TestEvaluation(metadata=metadata, num_workers=2) | ||
|
|
||
| with patch("benchmarks.utils.evaluation.LaminarService") as mock_lmnr: | ||
| svc = Mock() | ||
| svc.create_evaluation.return_value = None | ||
| svc.create_evaluation_datapoint.return_value = None | ||
| mock_lmnr.get.return_value = svc | ||
|
|
||
| results = evaluator.run() | ||
|
|
||
| # All 4 instances should produce output (3 success + 1 error) | ||
| assert len(results) == 4 | ||
| result_ids = {r.instance_id for r in results} | ||
| assert result_ids == {"inst_0", "inst_1", "inst_2", "inst_3"} | ||
|
|
||
| # Check error output was created for the failing instance | ||
| error_results = [r for r in results if r.error is not None] | ||
| assert len(error_results) == 1 | ||
| assert error_results[0].instance_id == "inst_2" |
There was a problem hiding this comment.
🟢 Acceptable - Integration Test Added: This test creates a real Evaluation instance, runs 4 instances (including one that fails), and verifies error outputs are created.
This addresses the "Missing Integration Test" feedback from previous reviews. The test exercises the async path end-to-end.
Minor note: The test mocks LaminarService, but that's acceptable here since you're testing the evaluation orchestration, not Laminar integration.
| @@ -165,28 +171,23 @@ def test_keyboard_interrupt_cleanup(): | |||
| f"Could not get evaluation process PID. Stdout: {stdout_lines}" | |||
| ) | |||
|
|
|||
| # Wait for worker processes to start | |||
| # Wait for worker threads to start. | |||
| # With asyncio.to_thread(), workers are threads inside the process | |||
| # (not child processes), so we can't enumerate them from outside. | |||
| # Instead we verify the process is still alive — if workers hadn't | |||
| # started their 60 s sleep, the process would have exited already. | |||
| print("Waiting for workers to start...") | |||
| time.sleep(3) | |||
|
|
|||
| # Get child processes before interrupt | |||
| children_before = get_child_processes(eval_pid) | |||
| python_workers_before = [ | |||
| p for p in children_before if "python" in p.name().lower() | |||
| ] | |||
| print(f"Worker processes before interrupt: {len(python_workers_before)}") | |||
| print(f"Worker PIDs: {[p.pid for p in python_workers_before]}") | |||
|
|
|||
| # Verify we have worker processes | |||
| assert len(python_workers_before) > 0, ( | |||
| f"No worker processes found. All children: {[(p.pid, p.name()) for p in children_before]}" | |||
| assert process.poll() is None, ( | |||
| f"Process exited prematurely with code {process.returncode}" | |||
| ) | |||
|
|
|||
| # Send SIGINT to the subprocess | |||
| print("\n=== Sending SIGINT ===") | |||
| process.send_signal(signal.SIGINT) | |||
|
|
|||
| # Wait for process to exit | |||
| # Wait for process to exit — clean exit proves all threads stopped | |||
| try: | |||
| process.wait(timeout=10) | |||
| print(f"Process exited with code: {process.returncode}") | |||
| @@ -195,33 +196,15 @@ def test_keyboard_interrupt_cleanup(): | |||
| process.kill() | |||
| process.wait() | |||
|
|
|||
| # Give a moment for cleanup | |||
| time.sleep(2) | |||
|
|
|||
| # Check if all worker processes are gone | |||
| remaining_workers = [] | |||
| for worker in python_workers_before: | |||
| try: | |||
| if psutil.pid_exists(worker.pid): | |||
| proc = psutil.Process(worker.pid) | |||
| # Check if it's still the same process (not reused PID) | |||
| if proc.create_time() == worker.create_time(): | |||
| remaining_workers.append(worker.pid) | |||
| except psutil.NoSuchProcess: | |||
| pass # Process is gone, which is what we want | |||
|
|
|||
| print("\n=== Results ===") | |||
| print(f"Worker processes before: {len(python_workers_before)}") | |||
| print(f"Remaining workers: {len(remaining_workers)}") | |||
| if remaining_workers: | |||
| print(f"Remaining PIDs: {remaining_workers}") | |||
|
|
|||
| # Assert all workers are cleaned up | |||
| assert len(remaining_workers) == 0, ( | |||
| f"Worker processes still running after SIGINT: {remaining_workers}" | |||
| # Verify the process is fully gone (no zombie / leftover children) | |||
| time.sleep(1) | |||
| remaining_children = get_child_processes(eval_pid) | |||
| assert len(remaining_children) == 0, ( | |||
| f"Child processes still running after SIGINT: " | |||
| f"{[(p.pid, p.name()) for p in remaining_children]}" | |||
| ) | |||
|
|
|||
| print("✓ All child processes cleaned up successfully") | |||
| print("✓ Process exited cleanly — all worker threads cleaned up") | |||
|
|
|||
There was a problem hiding this comment.
🟠 Important - Test Doesn't Verify What Comment Claims: The comment says "verifies that worker threads are running before the interrupt" (line 109), but the implementation just checks process.poll() is None (line 182).
Issue: This proves the process hasn't exited, NOT that worker threads are actually running. With asyncio, the workers are threads inside the process, so you can't enumerate them from outside.
The test is still valid (proves clean shutdown), but the comment overpromises. Change line 109-110 to:
| """Test that worker threads are properly cleaned up on KeyboardInterrupt. | |
| The asyncio evaluation uses threads (via asyncio.to_thread()), not child | |
| processes. This test verifies that the process exits cleanly after SIGINT. | |
| Worker thread cleanup is implicit - if threads were stuck, the process | |
| wouldn't exit within the timeout. | |
| """ |
all-hands-bot
left a comment
There was a problem hiding this comment.
Taste Rating: 🟡 Acceptable
The core refactor (ProcessPoolExecutor → asyncio) solves a real production problem (30× memory from worker processes) and is architecturally sound for I/O-bound workloads. The simplified logging approach is pragmatic.
However: The exception handling around span lifecycle is brittle, and there's unnecessary state splitting in the timeout tracking. These should be cleaned up for maintainability.
benchmarks/utils/evaluation.py
Outdated
| # Contextvars tokens from main thread can't detach in worker. | ||
| # Only ignore this specific error, log others. | ||
| if "context" not in str(e).lower(): | ||
| logger.warning("[worker] span.end() RuntimeError: %s", e) |
There was a problem hiding this comment.
🟠 Important - Fragile Exception Handling: Catching Exception and checking if "context" is in the string is brittle. If the error message format changes or there's a different contextvars error, this breaks.
Better approach - be explicit about what we're ignoring:
try:
eval_span.end()
except LookupError as e:
# Contextvars tokens from main thread can't detach in worker thread
# This is expected due to Python's contextvars design
pass
except Exception as e:
logger.error("[worker] Unexpected span.end() error: %s", e, exc_info=True)(Contextvars raises LookupError for token issues, not RuntimeError with "context" in the message)
benchmarks/utils/evaluation.py
Outdated
| if "context" not in str(e).lower(): | ||
| logger.warning("[worker] exec_span.end() RuntimeError: %s", e) | ||
| except Exception as e: | ||
| logger.warning("[worker] exec_span.end() error: %s", e) |
There was a problem hiding this comment.
🟠 Important - Duplicate Fragile Pattern: Same brittle exception handling as above. If you must keep string matching, at least extract it to a helper function so the workaround is in one place:
def safe_end_span(span, context_name="worker"):
"""End span, ignoring contextvars threading limitations."""
try:
span.end()
except LookupError:
pass # Expected: contextvars tokens can't cross thread boundaries
except Exception as e:
logger.warning(f"[{context_name}] span.end() error: %s", e)
benchmarks/utils/evaluation.py
Outdated
| ) -> Tuple[EvalInstance, EvalOutput]: | ||
| """Process one instance with semaphore-based concurrency control.""" | ||
| async with semaphore: | ||
| # Record start time NOW — timeout counts from when the |
There was a problem hiding this comment.
🟡 Suggestion - Split State: You track task start times in two places:
active_start_timesdict (line 467)PendingInstance.task(line 458)
This creates two sources of truth. The comment "Only tasks present in this dict are subject to timeout" reveals this is a workaround.
Cleaner approach: Add start_time field to PendingInstance and set it when the task acquires the semaphore. Check pending_instances[task].start_time directly instead of a separate dict.
|
|
||
| # Wait for worker processes to start | ||
| # Wait for worker threads to start. | ||
| # With asyncio.to_thread(), workers are threads inside the process |
There was a problem hiding this comment.
🟡 Suggestion - Comment Mismatch: The comment on line 174 says "Wait for worker threads to start" and explains why we can't enumerate them, but then says "Instead we verify the process is still alive — if workers hadn't started their 60 s sleep, the process would have exited already."
Issue: This doesn't actually prove threads started — the main asyncio loop keeps the process alive even if no threads are running yet. The 3-second sleep is a heuristic.
Be honest in the comment:
# Wait for worker threads to start (heuristic - we can't enumerate threads from outside)
time.sleep(3)
benchmarks/utils/worker_context.py
Outdated
| @@ -0,0 +1,206 @@ | |||
| """Simple instance-tagged logging for evaluation workers. | |||
There was a problem hiding this comment.
🟢 Acceptable - Pragmatic Solution: This shared-log-then-split approach is simpler than the previous complex thread-local routing. While pure structured logging (with instance_id as a field) might be even simpler, this is a reasonable pragmatic solution given the requirement to split logs into per-instance files.
The post-processing in split_logs() is straightforward and easier to reason about than the previous routing classes.
| Creates a TestEvaluation with mock workspaces and verifies that: | ||
| - Multiple instances run concurrently via asyncio | ||
| - Results are collected and written to attempt files | ||
| - Errors produce error outputs (not lost instances) |
There was a problem hiding this comment.
🟢 Good - Integration Test: This test creates a real Evaluation instance and runs multiple instances through the async path, including error handling. This addresses the previous review's concern about missing integration tests.
The mock workspace is acceptable here — testing the full Docker runtime would be too slow for unit tests.
…ndling - Add start_time field to PendingInstance instead of separate dict - Create _safe_end_span() helper that catches LookupError (not string matching) - Fix test comment to be honest about the 3-second heuristic Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4f6ecde to
ce086e7
Compare
all-hands-bot
left a comment
There was a problem hiding this comment.
Taste Rating: 🔴 Blocking Issues + Unjustified Complexity
The core refactor (ProcessPoolExecutor → asyncio) solves a real problem (30× memory multiplication causing OOM). However:
🔴 BLOCKING: No Evidence This Works
@juanmichelini reported consistent failures with GLM-5 that works on main. The PR description claims memory benefits but provides no links to successful evaluation runs. Without proof this actually works in production, we're trading known-working code for unproven complexity.
🔴 CRITICAL: Complexity Explosion
We've traded 30× memory for 331 lines of thread-routing infrastructure that:
- Shouldn't need to exist (use structured logging with instance_id field)
- Has 3-level exception fallback chains masking real failures
- Uses thread-local state + global initialization + routing classes
This violates "good taste" - we're handling edge cases instead of eliminating them.
Required Before Merge:
- Links to successful SWTBench/multimodal evaluation runs showing this works
- Simplify worker_context.py or justify why 331 lines of routing is necessary
- Address the specific inline comments below
This review was created by an AI assistant (OpenHands) on behalf of the repository maintainers.
| try: | ||
| fh.close() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
🔴 Critical - Over-Engineering: This entire 331-line module exists to route logs/stdout to per-thread files. Why?
The Real Problem: We're solving logging the hard way because we're stuck with print() and global logging.getLogger() from third-party code.
Simpler Approach: Use structured logging with instance_id as a field, then filter/split at write time:
logger.info("Processing", extra={"instance_id": inst.id})Then a single log handler can write to per-instance files by checking record.instance_id.
This eliminates:
- Thread-local state tracking
- All the routing classes
- Complex initialization guards
- 200+ lines of indirection
If you MUST keep this approach, justify why structured logging won't work.
|
|
||
| # --- file-like API used by print() and the logging module --------------- | ||
|
|
||
| def write(self, s: str) -> int: | ||
| target = self._target() | ||
| try: | ||
| return target.write(s) # type: ignore[union-attr] | ||
| except (ValueError, OSError): | ||
| # Target closed/broken — try original, then __stderr__ as last resort | ||
| try: | ||
| return self._original.write(s) # type: ignore[union-attr] | ||
| except Exception: | ||
| if sys.__stderr__: | ||
| return sys.__stderr__.write(s) | ||
| return 0 |
There was a problem hiding this comment.
🟠 Important - Fragile Fallback Chain: Three levels of exception handling for a simple write:
- Try target.write()
- Catch, try self._original.write()
- Catch, try sys.stderr.write()
Issue: This masks real problems. If the target fails (disk full, permissions), we should KNOW about it, not silently fall back.
Better Approach: Fail fast
def write(self, s: str) -> int:
target = self._target()
try:
return target.write(s) # type: ignore[union-attr]
except (ValueError, OSError) as e:
# File closed/broken is a real error, not something to hide
logger.error(f"Log write failed: {e}")
raiseIf writes are failing, we need to know, not paper over it.
|
|
||
| def reset_logger_for_multiprocessing(log_dir: str, instance_id: str) -> None: | ||
| """Reset the logger for multiprocessing with instance-specific logging. | ||
|
|
||
| See benchmarks.utils.console_logging.setup_instance_logging for details. | ||
| """ | ||
| from benchmarks.utils.console_logging import setup_instance_logging | ||
|
|
||
| setup_instance_logging(log_dir, instance_id) | ||
|
|
||
|
|
||
| @contextmanager | ||
| def redirect_stdout_stderr(log_file_path: str): | ||
| """Context manager to redirect stdout/stderr to a log file. | ||
|
|
||
| This captures all print() statements, SDK visualizations, and any other | ||
| output that goes to stdout/stderr. | ||
| def _safe_end_span(span: Any, label: str) -> None: | ||
| """End a span, handling contextvars errors from cross-thread usage. | ||
|
|
||
| Args: | ||
| log_file_path: Path to the log file where output should be redirected | ||
| OpenTelemetry spans use contextvars tokens that can only be detached | ||
| in the thread where they were attached. When a span created in the main | ||
| thread is ended in a worker thread, LookupError is raised. | ||
| """ | ||
| # Save original stdout/stderr | ||
| original_stdout = sys.stdout | ||
| original_stderr = sys.stderr | ||
| log_file = None | ||
|
|
||
| try: | ||
| # Open log file in append mode with line buffering | ||
| log_file = open(log_file_path, "a", buffering=1, encoding="utf-8") | ||
|
|
||
| # Redirect stdout and stderr | ||
| sys.stdout = log_file | ||
| sys.stderr = log_file | ||
|
|
||
| yield | ||
|
|
||
| finally: | ||
| # Restore original stdout/stderr | ||
| sys.stdout = original_stdout | ||
| sys.stderr = original_stderr | ||
|
|
||
| # Close the log file if it was opened | ||
| if log_file is not None and not log_file.closed: | ||
| log_file.close() | ||
| span.end() | ||
| except LookupError: | ||
| # Expected when span was created in main thread but ended in worker. | ||
| # The span data is still recorded; only the context detach fails. | ||
| pass | ||
| except Exception as e: | ||
| logger.warning("[worker] %s.end() error: %s", label, e) |
There was a problem hiding this comment.
🟡 Acceptable - Pragmatic Workaround: Extracting _safe_end_span is good, but the generic Exception catch is still concerning.
Current Code:
except LookupError: # Expected
pass
except Exception as e:
logger.warning(...) # Could hide real bugsSuggestion: Be more specific about what we're ignoring:
except LookupError:
# Expected: span created in main thread, ended in worker
pass
except RuntimeError as e:
if "context" in str(e).lower():
pass # Another contextvars variation
else:
raise # Real SDK errorThis way we don't accidentally swallow legitimate span.end() failures.
|
|
||
| instance: EvalInstance | ||
| start_time: float | ||
| datapoint_id: UUID | None = None | ||
| task: asyncio.Task | None = field(default=None, repr=False) |
There was a problem hiding this comment.
🟡 Suggestion - Sentinel Pattern: Using start_time: float | None = None as sentinel creates split state (also tracked in dict at line 435).
Issue: Two sources of truth for when instance started. The comment at line 474 reveals this: "Reset start_time to NOW so timeout counts from when instance actually begins running".
Cleaner Design: Don't track start_time in PendingInstance at all. Only track in the pending_instances dict when the semaphore is acquired:
@dataclass
class PendingInstance:
instance: EvalInstance
datapoint_id: UUID | None = None
task: asyncio.Task | None = None
# No start_time here
# In _run_attempt_async:
active_start_times: dict[asyncio.Task, float] = {} # Only source of truth
async with semaphore:
task = asyncio.current_task()
active_start_times[task] = time.monotonic() # Set once, when work startsThis eliminates the None checks and "reset" logic.
| def emit(self, record: logging.LogRecord) -> None: | ||
| fh: logging.FileHandler | None = getattr(_ctx, "file_handler", None) | ||
| if fh is None: | ||
| return | ||
| record_msg = self.format(record) | ||
| try: | ||
| fh.stream.write(record_msg + "\n") | ||
| fh.stream.flush() | ||
| except (OSError, ValueError): | ||
| # File handler failed (closed file, disk full, etc.) — | ||
| # fall back to stderr so the message isn't silently lost. | ||
| if sys.__stderr__: | ||
| try: | ||
| sys.__stderr__.write(record_msg + "\n") |
There was a problem hiding this comment.
🟠 Important - Silent Failure: The fallback to sys.__stderr__ could also fail, and we'd never know.
except (OSError, ValueError):
if sys.__stderr__:
try:
sys.__stderr__.write(record_msg + "\n")
except Exception:
pass # Silently swallowedBetter:
except (OSError, ValueError) as e:
# Last resort: write to stderr if available
if sys.__stderr__:
try:
sys.__stderr__.write(f"LOG WRITE FAILED: {e}\n{record_msg}\n")
except Exception:
pass # Truly nothing left to do
# Don't return silently - at least raise so we know logging is broken
raise RuntimeError(f"All log write attempts failed: {e}")If we can't write logs anywhere, the evaluation should probably fail rather than continue silently.
| - Errors produce error outputs (not lost instances) | ||
| - Thread-safe logging is properly set up | ||
| """ | ||
| from typing import List | ||
| from unittest.mock import Mock, patch | ||
|
|
||
| from benchmarks.utils.evaluation import Evaluation | ||
| from benchmarks.utils.models import EvalInstance, EvalMetadata, EvalOutput | ||
| from openhands.sdk import LLM | ||
| from openhands.sdk.critic import PassCritic | ||
|
|
||
| instances = [EvalInstance(id=f"inst_{i}", data={"idx": i}) for i in range(4)] | ||
| # Instance 2 will fail | ||
| fail_ids = {"inst_2"} | ||
|
|
||
| class TestEvaluation(Evaluation): | ||
| def prepare_instances(self) -> List[EvalInstance]: | ||
| return instances | ||
|
|
||
| def prepare_workspace(self, instance, resource_factor=1, forward_env=None): | ||
| ws = Mock() | ||
| ws.__exit__ = Mock() | ||
| return ws | ||
|
|
||
| def evaluate_instance(self, instance, workspace): | ||
| if instance.id in fail_ids: | ||
| raise RuntimeError(f"Simulated failure for {instance.id}") | ||
| return EvalOutput( | ||
| instance_id=instance.id, | ||
| test_result={"ok": True}, | ||
| instruction="test", | ||
| error=None, | ||
| history=[], | ||
| instance=instance.data, | ||
| ) | ||
|
|
||
| llm = LLM(model="test-model") | ||
| metadata = EvalMetadata( | ||
| llm=llm, | ||
| dataset="test", | ||
| dataset_split="test", | ||
| max_iterations=10, | ||
| eval_output_dir=str(tmp_path), | ||
| details={}, | ||
| eval_limit=4, | ||
| max_attempts=1, | ||
| max_retries=0, | ||
| critic=PassCritic(), | ||
| ) | ||
|
|
||
| evaluator = TestEvaluation(metadata=metadata, num_workers=2) | ||
|
|
||
| with patch("benchmarks.utils.evaluation.LaminarService") as mock_lmnr: | ||
| svc = Mock() | ||
| svc.create_evaluation.return_value = None | ||
| svc.create_evaluation_datapoint.return_value = None | ||
| mock_lmnr.get.return_value = svc | ||
|
|
||
| results = evaluator.run() | ||
|
|
||
| # All 4 instances should produce output (3 success + 1 error) | ||
| assert len(results) == 4 | ||
| result_ids = {r.instance_id for r in results} | ||
| assert result_ids == {"inst_0", "inst_1", "inst_2", "inst_3"} | ||
|
|
||
| # Check error output was created for the failing instance | ||
| error_results = [r for r in results if r.error is not None] | ||
| assert len(error_results) == 1 | ||
| assert error_results[0].instance_id == "inst_2" | ||
|
|
There was a problem hiding this comment.
🟡 Suggestion - Mock Workspace: This integration test uses workspace = Mock() with __exit__ = Mock(), which doesn't exercise the actual workspace cleanup path.
Current Coverage: Tests the asyncio orchestration but not the SDK integration.
Suggestion: Consider adding one test with a real workspace (even if it's just a local Docker container) to verify:
- Thread executor actually runs SDK operations
- Workspace cleanup happens correctly
- Conversation archives are captured
The mock is acceptable for testing the async orchestration patterns, but we should have at least one end-to-end test that proves the SDK integration works.
This partially addresses previous review feedback about integration tests, but real workspace testing would be more convincing.
| # Heuristic wait for worker threads to start. | ||
| # With asyncio.to_thread(), workers are threads inside the process | ||
| # (not child processes), so we can't enumerate them from outside. | ||
| # 3 seconds is a rough guess that workers have started their 60s sleep. | ||
| print("Waiting for workers to start...") | ||
| time.sleep(3) |
There was a problem hiding this comment.
🟡 Suggestion - Weak Verification: Comment says "Heuristic wait for worker threads to start" with 3-second sleep + "rough guess".
Issue: This doesn't actually verify threads started, just that the process didn't exit.
Suggestion: Add verification that proves work started:
# Check that at least one log file was created
log_dir = tmp_path / "logs"
time.sleep(3) # Wait for startup
log_files = list(log_dir.glob("instance_*.log"))
assert len(log_files) > 0, "No worker threads started (no log files created)"This provides concrete evidence that worker threads actually began processing before the interrupt.
Validation Runs (2026-03-19)All runs today using Successful Runs
Failed / Cancelled
Currently Running
Summary
|
all-hands-bot
left a comment
There was a problem hiding this comment.
Taste Rating: 🔴 Blocking Issue — No Evidence This Works
The core refactor (ProcessPoolExecutor → asyncio) solves a real production problem (30× memory multiplication from worker processes). The approach is theoretically sound for I/O-bound workloads.
However:
🔴 BLOCKING: Runtime Failures Reported, Zero Evidence Provided
@juanmichelini reported failures with GLM-5 that works on main:
"Tested it with swebenchmultimodal GLM-5 (which works on main) and it consistently gives the same error"
The PR description claims memory benefits but provides no links to successful evaluation runs. This is unacceptable. You're replacing working code with unproven code.
Required before merge:
- Add an
Evidencesection to the PR description - Link to successful SWE-Bench Multi-modal runs (full eval, not toy examples)
- Debug and resolve the GLM-5 failures @juanmichelini reported
- Show this actually reduces memory in production (before/after metrics)
What Improved Since Previous Reviews
✅ Integration tests added (test_async_evaluation.py lines 207, 275) — good response to feedback
✅ Thread-safety locks for file writes (asyncio.Lock) and env vars (threading.Lock) — correct approach
✅ _safe_end_span() extracted to reduce duplication
Remaining Fundamental Issues
These have been discussed in previous reviews but remain unresolved:
- worker_context.py is solving logging the wrong way (331 lines of thread-local routing)
- Silent exception swallowing could hide real bugs
- Split state in timeout tracking (sentinel pattern with
start_time: float | None)
See inline comments for specific examples.
VERDICT
❌ Cannot merge: No evidence this works, user reported failures, unresolved design complexity
KEY INSIGHT: Solving a real memory problem by trading working processes for unproven threads + 331 lines of logging infrastructure is not a win until you prove it actually works.
| try: | ||
| fh.close() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
🔴 Critical — Over-Engineering: This entire 331-line module exists to route logs/stdout to per-thread files because we're stuck with print() and global logging.getLogger() from third-party code.
The Real Problem: We're solving logging the hard way. Three separate routing classes (_RoutedFileHandler, _RoutedConsoleHandler, _ThreadLocalWriter) all using threading.local() indirection.
Simpler Approach: Use structured logging with instance_id as a field, then filter at write time:
logger.info("Processing", extra={"instance_id": instance.id})
# Filter in handler to write different instance_ids to different filesThis would be ~50 lines instead of 331.
But: I acknowledge this has been extensively discussed in previous reviews. If you're committed to this approach, at least prove the whole refactor works first (see blocking evidence issue).
| # The span data is still recorded; only the context detach fails. | ||
| pass | ||
| except Exception as e: | ||
| logger.warning("[worker] %s.end() error: %s", label, e) |
There was a problem hiding this comment.
🟠 Important — Brittle Exception Handling: Extracting _safe_end_span reduces duplication (good), but the pattern is still concerning:
except LookupError:
pass # Expected
except Exception as e:
logger.warning(...) # Could hide real bugsIssue: If span.end() fails for a DIFFERENT reason than the contextvars issue (network error, SDK bug, disk full), you log a warning and move on. The span data might be lost, but you'll never know why.
Better approach: Be explicit about what you're ignoring:
except (LookupError, RuntimeError) as e:
if "context" not in str(e).lower():
raise # Unexpected error, don't hide it
# else: expected contextvars thread issueThis is still hacky, but at least it won't silently swallow unrelated failures.
| # fall back to stderr so the message isn't silently lost. | ||
| if sys.__stderr__: | ||
| try: | ||
| sys.__stderr__.write(record_msg + "\n") |
There was a problem hiding this comment.
🟠 Important — Silent Failure Chain: Three levels of exception handling for a simple write:
- Try
fh.stream.write() - Catch → try
sys.__stderr__.write() - Catch →
pass(silently swallowed)
Issue: If both the file handler AND stderr fail (disk full, permissions, closed streams), logs disappear with no indication anything went wrong.
Better approach: Log to a last-resort location or fail fast:
try:
fh.stream.write(msg + "\n")
fh.stream.flush()
except (OSError, ValueError) as e:
# Last resort: write to a dedicated error file
with open("/tmp/evaluation_logging_errors.log", "a") as err:
err.write(f"Failed to write log: {e}\n{msg}\n")At least then you have a breadcrumb trail when things break.
| Creates a TestEvaluation with mock workspaces and verifies that: | ||
| - Multiple instances run concurrently via asyncio | ||
| - Results are collected and written to attempt files | ||
| - Errors produce error outputs (not lost instances) |
There was a problem hiding this comment.
🟢 Good — Integration Test Added: This test creates a real Evaluation instance, runs 4 instances (including one that fails), and verifies outputs are created.
This directly addresses previous review feedback about missing integration tests. The test exercises the async path end-to-end, which is what was needed.
However: The reported GLM-5 failures suggest there's a real-world failure mode these tests don't catch. After you get evidence of successful runs, consider adding a test case that uses the same configuration that failed for @juanmichelini.
| if runtime_failure_count > 0: | ||
| logger.warning( | ||
| f"[child] Instance {instance.id}: " | ||
| f"[worker] Instance {instance.id}: " |
There was a problem hiding this comment.
🟢 Good — Thread-Safe Environment Variable Access: Using threading.Lock to protect os.environ["LMNR_SPAN_CONTEXT"] read/write is correct.
This prevents race conditions where one thread overwrites another's span context before prepare_workspace() reads it. Simple, effective, no over-engineering.
The SDK's WebSocket callback thread fires visualization events (Console.print) on a daemon thread that lacks the worker thread's _ctx.log_file. This caused .output.log files to be empty (0 bytes). Fix: add a process-level registry mapping worker thread IDs to their log files, monkey-patch Thread.__init__ to track parent thread IDs, and fall back to the parent's log file in _ThreadLocalWriter._target() when the current thread has no log_file of its own. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Testing the GLM-5 and swebenchmultimodal issue https://openhands-eval-monitor.vercel.app/?run=swebenchmultimodal%2Flitellm_proxy-openrouter-z-ai-glm-5%2F23316047170%2F |
Summary
Replaces
ProcessPoolExecutorwith asyncio + semaphore-based concurrency for evaluation orchestration. Since the workload is I/O-bound (HTTP calls to LLM proxy + runtime API), this eliminates the 30x memory multiplication from spawning 30 worker processes — the root cause of OOM failures on long-running SWTBench evaluations (#441).Key Changes
evaluation.py— Core refactor_run_iterative_mode_async()withasyncio.run()bridge from sync_run_iterative_mode()_run_attempt_async(): Semaphore-controlled concurrency (asyncio.Semaphore(num_workers)), tasks viaasyncio.create_task(), timeout detection viaasyncio.wait(FIRST_COMPLETED)_process_one_mp→_process_one_sync: Runs in thread executor viaasyncio.to_thread()— same sync SDK operations, no code change needed in subclassesasyncio.Lockfor file writes,threading.LockforLMNR_SPAN_CONTEXTenv var,_safe_end_span()for cross-thread OpenTelemetry span cleanupworker_context.py— New moduleConsolidates all per-thread routing infrastructure into a single
threading.local():_RoutedFileHandler/_RoutedConsoleHandler: Single root logger handlers that delegate to per-thread formatters/file handlers_ThreadLocalWriter: Replacessys.stdout/sys.stderrwith thread-aware writers that routeprint()to per-instance log filesinstance_context(): Single context manager replacing bothsetup_instance_logging()andredirect_stdout_stderr()initialize(): One-time setup (idempotent) — installs handlers, writers, suppresses noisy OTel context-detach errorsconsole_logging.pysetup_instance_logging()(moved intoworker_context.instance_context())Tests
test_async_evaluation.py(7 tests + 2 integration tests): semaphore concurrency,to_thread, timeout detection, task cancellation, file locking, end-to-end evaluation run, timeout cancellationtest_keyboard_interrupt.py: Adapted for threads (no child processes to enumerate)test_workspace_cleanup.py:_process_one_mp→_process_one_syncDependencies
pytest-asynciofor async test supportasyncio_mode = "auto"inpyproject.tomlMemory Impact
Backward Compatibility
Evaluation.run()API unchangednum_workersstill controls concurrencyprepare_instances,prepare_workspace,evaluate_instance)Fixes #441
Tracking issue: OpenHands/evaluation#304
Umbrella issue: OpenHands/evaluation#303