-
Notifications
You must be signed in to change notification settings - Fork 104
fix: multi-turn benchmark hangs after all clients finish #908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: experimental/multi-turn-benchmark
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1098,6 +1098,16 @@ def _duration_expired(): | |
| while not result_queue.empty(): | ||
| client_metrics.append(result_queue.get()) | ||
| pbar.update(1) | ||
| # Fallback: detect dead children whose TERM_SIGNAL was lost | ||
| dead = sum(1 for c in clients if not c.is_alive()) | ||
| if dead >= bench_args.num_clients: | ||
| lost = bench_args.num_clients - num_clients_finished | ||
| logger.warning( | ||
| f"{Color.YELLOW}All client processes are dead but " | ||
| f"{lost} TERM_SIGNALs were lost — breaking out of " | ||
| f"main loop{Color.RESET}" | ||
| ) | ||
| break | ||
| continue | ||
|
|
||
| # Collect results (measurements) | ||
|
|
@@ -2011,3 +2021,7 @@ async def main() -> None: | |
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) | ||
| # Force exit to avoid hanging on multiprocessing Queue feeder threads | ||
| # during interpreter shutdown (the queues may still have thousands of | ||
| # undelivered items whose feeder threads deadlock on GC). | ||
| os._exit(0) | ||
|
Comment on lines
+2024
to
+2027
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Extended reasoning...Analysis
Code pathThe Why this manifests in practiceWhen stdout is connected to a terminal, Python uses line buffering, so each Step-by-step proof
FixThe fix is trivial — add explicit flushes before the if __name__ == "__main__":
asyncio.run(main())
sys.stdout.flush()
sys.stderr.flush()
os._exit(0)This preserves the deadlock-avoidance benefit of |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟣 Pre-existing bug: The
asyncio.wait_for(loop.run_in_executor(None, conv_queue.get), timeout=1.0)pattern is the root cause of the lost TERM_SIGNALs that the dead-children fallback works around. Whenwait_fortimes out, it cancels the asyncio Future but cannot stop the underlying thread still blocked onqueue.get(), so stale threads accumulate and silently consume queue items. Replace withloop.run_in_executor(None, lambda: conv_queue.get(timeout=1.0))and catchqueue.Emptyto eliminate the race.Extended reasoning...
The anti-pattern
The code at line 1092-1095 uses
asyncio.wait_for(loop.run_in_executor(None, conv_queue.get), timeout=1.0). This is a well-known Python anti-pattern when combiningasyncio.wait_forwithrun_in_executoron a blocking call that has no internal timeout.How it fails
When
wait_fortimes out after 1 second, it cancels the asyncio Future wrapper. However, the underlying thread spawned byrun_in_executoris already running and blocked onconv_queue.get()—concurrent.futures.Future.cancel()returnsFalsefor a running task, so the thread continues blocking indefinitely. On the next loop iteration,run_in_executorspawns a new thread, also blocking onconv_queue.get(). After N timeouts, there are N+1 threads competing for queue items.Step-by-step proof of item loss
run_in_executorspawns Thread-A blocking onconv_queue.get(). After 1s,wait_fortimes out and cancels asyncio Future-A.run_in_executorspawns Thread-B blocking onconv_queue.get(). Now Thread-A and Thread-B both wait on the queue.(TERM_SIGNAL, TERM_SIGNAL)onconv_queue.conv_queue.get(), consuming the item. It sets the result on itsconcurrent.futures.Future._chain_future/_copy_future_statein CPython asyncio) checks the destination asyncio Future-A — it was already cancelled in step 1. The callback callsif dest.cancelled(): return, silently discarding the result.num_clients_finishedis never incremented for that client.This exactly explains the PR description observation: all 32 clients logged "is done" (printed after
conv_queue.put((TERM_SIGNAL, TERM_SIGNAL))at line 818-822), but only 24 TERM_SIGNALs were received by the main loop — the other 8 were consumed by stale executor threads.Impact
Without the dead-children fallback added by this PR, the main loop would hang forever waiting for TERM_SIGNALs that were already consumed and discarded. The fallback correctly works around the symptom, but the root cause remains: queue items can be silently lost, and stale threads accumulate over the benchmark lifetime.
Fix
Replace the
wait_for + run_in_executorpattern with a thread-level timeout:This way the thread itself times out cleanly via
queue.get(timeout=1.0), no items are silently consumed, and no stale threads accumulate. The dead-children fallback could then be kept as a safety net rather than being required for correctness.