feat: stream reconnect — turn buffers behind /tail and /in-flight (ENG-289)#38
Open
ianu82 wants to merge 3 commits into
Open
feat: stream reconnect — turn buffers behind /tail and /in-flight (ENG-289)#38ianu82 wants to merge 3 commits into
ianu82 wants to merge 3 commits into
Conversation
…G-289) The Task view recovers an in-progress turn (notably a scheduled 'Run now') by probing GET /responses/in-flight and tailing GET /responses/tail. After the cowork-server migration both were stubs: /in-flight lacked has_buffer/latest_seq and /tail returned a JSON status blob instead of an SSE replay — so opening a running task showed a stale view until a manual refresh. - services/stream_buffer.py: in-memory per-conversation turn buffers (seq-numbered append log, async followers, finished-buffer TTL) — the lean port of the legacy server's file-backed stream registry - handlers/responses.py: both paths (streaming AND stream=False, which scheduled runs use) feed the buffer through a shared event sink and close it in a finally - endpoints/responses.py: /in-flight reports has_buffer + latest_seq, /in-flight-list includes latest_seq, /tail replays from from_seq then follows the live producer with the same SSE framing POST /responses emits, 404 when no buffer (client falls back to history) Verified live: run-now → mid-run in_flight:true with advancing latest_seq → tail streams created/deltas/completed + [DONE] → post-run in_flight:false with the buffer still replayable. 8 new tests; suite 41 passed. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
6 tasks
Contributor
Author
|
This PR instead: #38 |
run_schedule_now marks the conversation as in-flight and returns the conversation_id before the background task starts. The client then probes /in-flight (sees in_flight=true) and opens /tail — but the handler hasn't called begin_turn yet, so /tail 404s. The client treats that as "nothing to tail", calls onDone, clears the in-flight state, and never reconnects. The task runs to completion server-side but the UI shows stale/idle state. Fix: - Create the turn buffer eagerly in run_schedule_now alongside mark_stream_active, so /tail and /in-flight see has_buffer=true from the moment the client can probe. - Add ensure_buffer() to stream_buffer: returns the existing buffer if one is active, otherwise creates a new one. The handler's _make_event_sink now uses this so it reuses the eagerly-created buffer instead of replacing it (which would orphan any /tail followers already attached). - Finish orphaned buffers in execute_schedule's finally block so a crash before the handler runs doesn't leave /tail followers waiting forever. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Collaborator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes ENG-289 — opening a scheduled "Run now" task showed a stale view (or a spurious "got disconnected" prompt) until a manual refresh. Replaces cowork#158, which patched the legacy bundled server; per review there, the server work belongs here.
Root cause
The client's recovery contract — built against the legacy server — needs three things the migration left as stubs:
GET /responses/in-flightreturning{in_flight, has_buffer, latest_seq}(the migrated stub had no buffer fields)GET /responses/tail?conversation_id&from_seqas an SSE replay+follow of the running turn (the stub returned a JSON status blob, which the client's SSE parser read as an instantly-empty stream)Notably,
run-nowitself was already correct here (eager conversation +conversation_idin the response + in-flight registration) — and the client already reads it. What was missing was everything the client does next when you open the running task.What this adds
services/stream_buffer.py— in-memory, seq-numbered per-conversation turn buffers with async followers and a finished-buffer TTL (the lean port of the legacy file-backed stream registry; single-process server, buffers only need to outlive the reconnect race).stream=False— the latter is what scheduled runs execute with, which is exactly ENG-289's case. Buffers close in afinallyso tails can't hang on a crashed turn./in-flightreportshas_buffer/latest_seq;/in-flight-listincludeslatest_seq;/tailreplays fromfrom_seqthen follows the live producer using the same SSE framingPOST /responsesemits (client parser reused unchanged), 404 when the buffer is gone (client falls back to history).No client changes needed — the renderer's existing
fetchInFlightStatus/tailInFlight/heartbeat logic works as designed once the server honors the contract.Verification
Live, in a sandboxed desktop setup (Hermes harness, real LLM):
POST /schedules/{id}/run-now→conversation_idreturned immediatelyin_flight: true, has_buffer: truewithlatest_seqadvancing;/in-flight-listincludes the conversationGET /tail?from_seq=0opened mid-run streamed the whole turn live —response.created→ text deltas →response.completed→[DONE]in_flight: false, has_buffer: true; a late reconnect replays the full finished buffer8 new tests (replay/follow/concurrent followers/TTL, endpoint contract, handler sink); full suite 41 passed.
🤖 Generated with Claude Code