[connector] Fix tp_coordinator race condition when finish event arrives before start event#208
[connector] Fix tp_coordinator race condition when finish event arrives before start event#208lpdink wants to merge 1 commit into
Conversation
…ore start - Buffer SendBlockFinishedEvent in pending_save_finishes when SendBlockStartEvent has not arrived yet, instead of asserting and crashing - Merge buffered finish events when SendBlockStartEvent arrives - Aligns save flow with load flow's existing tolerant pattern - Add unit tests covering normal order, out-of-order, mixed order, and duplicate finish scenarios
There was a problem hiding this comment.
Review Summary
The core fix is sound — replacing the assert with a pending buffer correctly handles the ZMQ cross-sender ordering race, and the approach aligns well with the existing load-flow tolerant pattern. SaveContext.add_new_rank already deduplicates by rank, so duplicate finishes are handled safely in both the buffered and unbuffered paths.
Suggestions (non-blocking):
-
Duplicated completion logic: The callback/completion block now appears in both the start-event and finish-event handlers. Extracting a helper would reduce the risk of them diverging over time.
-
Test cleanup:
tearDowndoesn't join the coordinator thread or close ZMQ contexts/sockets, which can leak resources between tests. -
Fragile
time.sleep(0.1): The server-readiness wait could fail under load; a polling approach would be more robust. -
Missing test coverage: No test exercises duplicate finish events buffered before the start event (both hits the
pending_save_finishespath).add_new_rankdeduplicates correctly, but a test confirming this would be valuable.
🤖 Generated by Qoder
| if running_save[save_id].get_size() == self._tp_world_size: | ||
| save_context = running_save.pop(save_id) |
There was a problem hiding this comment.
The completion-check and callback logic is now duplicated in two places: the start-event handler (lines 156-161) and the finish-event handler (lines 175-184). If someone updates one path but not the other, it could introduce subtle bugs. Consider extracting a small helper like _complete_save(save_id, content, running_save) to consolidate this.
🤖 Generated by Qoder
| self.assertEqual(session_id, "session-1") | ||
| self.assertEqual(save_context.get_size(), 2) | ||
|
|
||
| def tearDown(self): |
There was a problem hiding this comment.
tearDown sets _coordinator_running = False but doesn't join the coordinator thread or close the ZMQ client/server sockets. This can leave dangling threads and file descriptors between tests. Consider calling self.server._coordinator_thread.join(timeout=2) and closing self.client._zmq_context / the server's ZMQ context in tearDown.
🤖 Generated by Qoder
| self.server = TpCoordinatorServer("127.0.0.1", self.port, 2, on_finished) | ||
| self.client = TpCoordinatorClient("127.0.0.1", self.port) | ||
| # Give coordinator thread time to bind | ||
| time.sleep(0.1) |
There was a problem hiding this comment.
time.sleep(0.1) is a fragile way to wait for server readiness — on a slow or loaded CI runner, 100ms may not be enough. A more robust approach is to poll with a short client connection attempt, or at minimum use a retry loop with a timeout.
🤖 Generated by Qoder
| self.client = TpCoordinatorClient("127.0.0.1", self.port) | ||
| time.sleep(0.1) | ||
|
|
||
| def test_duplicate_finish_ignored(self): |
There was a problem hiding this comment.
This test only covers duplicate finish when start arrives first (the else branch in the finish handler). It doesn't test the case where duplicate finishes from the same rank are both buffered in pending_save_finishes before the start arrives. Since add_new_rank deduplicates by rank, this should work correctly, but it'd be good to have a test confirming that path — e.g., send two finish(rank=0) before start, then verify get_size() == 1 after start merges them.
🤖 Generated by Qoder
Summary
Fix a race condition in
TpCoordinatorServerwhereSendBlockFinishedEventfrom TP Workers could arrive beforeSendBlockStartEventfrom the Scheduler, causing anassertcrash. The fix buffers out-of-order finish events and merges them when the start event arrives, aligning the save flow with the load flow's existing tolerant pattern.Motivation
The vllm Connector's save flow uses ZMQ for async communication between the Scheduler and TP Workers:
start_write_cache(HTTP), then sendsSendBlockStartEventvia ZMQ to register the save contextSendBlockFinishedEventvia ZMQ to report resultsThese two messages come from different ZMQ PUSH sockets connecting to the same PULL socket. ZMQ does not guarantee cross-sender message ordering. When data transfer is very fast (small data, local storage), the finish event can arrive before the start event, triggering:
This is not backend-specific — it can happen with any storage backend when transfer latency is low enough.
How It Works
Before (fragile):
After (tolerant):
This mirrors the existing load flow, which already uses a tolerant
if load_id not in running_load: createpattern.Changes
common/tp_coordinator.pyassertwith pending buffer; merge on start event arrivaltest/test_tp_coordinator.py2 files, +296 lines
Known Limitations
Pending buffer has no timeout cleanup: If
SendBlockStartEventnever arrives (upstream bug), buffered finish events remain in memory until the coordinator thread exits. This is acceptable because a missing start event indicates a more serious issue that should be investigated.Only affects vllm connector: The
TpCoordinatorServeris specific to the vllm connector's TP coordination. sglang and trtllm connectors use different mechanisms and are not affected.Normal-order path unchanged: When messages arrive in the expected order (start before finish), behavior is identical to the previous implementation — no performance impact.
Testing
Unit tests (4 cases, all pass):
test_normal_order_callback_triggeredtest_finish_before_start_no_crashtest_mixed_order_multiple_rankstest_duplicate_finish_ignoredFull test suite: All 83 bazel tests pass, no regressions.