Defer throttled streaming chunks via pending-flush timer#407
Conversation
Previously a delta that arrived inside the throttle window was silently dropped until the next delta showed up; when the LLM stalled near the window edge (cold-start, router handoff, tail-of-thought) the user saw the reply freeze for the rest of the interval before the next batch landed. Stash the latest accumulated text on each in-window delta and arm a one-shot deferred flush timer that publishes the stash when the window closes. Multiple in-window deltas collapse onto the latest text. While a dispatch is in flight, additional deltas (caller-driven or timer-driven) are stashed and the dispatch loop reflushes the most recent _pendingText after the in-flight chunk completes (reflush-on-conflict), so the conversation actor still observes strict edit ordering. Make the sink IDisposable so ChannelLlmReplyInboxRuntime cleans up an unfired timer on paths that skip FinalizeAsync (interactive reply, empty reply, exception). Adopts Microsoft.Extensions.TimeProvider.Testing.FakeTimeProvider for deterministic timer-driven test control. The hand-rolled FakeTimeProvider in TurnStreamingReplySinkTests is replaced with the BCL-aligned one. Refs #405 for the follow-up phase-state-machine + centralized unavailable-guard refactor on ConversationGAgent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 94409bd382
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| _pendingText = text; | ||
| _hasPending = true; | ||
| return; |
There was a problem hiding this comment.
Wait for in-flight dispatch when finalizing stream
When _dispatchInProgress is true, this path only stashes _pendingText and returns immediately, so FinalizeAsync can complete before the final chunk is dispatched. ChannelLlmReplyInboxRuntime.ProcessAsync then sends LlmReplyReadyEvent right after awaiting FinalizeAsync, and if that ready event is handled first, later chunk events are treated as late and dropped in ConversationGAgent.HandleLlmReplyStreamChunkAsync (the processed-command guard). This makes final-stream delivery race-dependent and can skip the intended final edit path.
Useful? React with 👍 / 👎.
Codecov Report✅ All modified and coverable lines are covered by tests. @@ Coverage Diff @@
## dev #407 +/- ##
==========================================
- Coverage 70.39% 70.38% -0.01%
==========================================
Files 1175 1175
Lines 84452 84452
Branches 11124 11124
==========================================
- Hits 59446 59441 -5
- Misses 20715 20720 +5
Partials 4291 4291
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
`FinalizeAsync` previously stashed the final text and returned immediately when a dispatch was in flight. The inbox runtime then sent `LlmReplyReadyEvent` right after, racing the late final chunk past `ConversationGAgent`'s processed-command guard which would silently drop it. `TurnStreamingReplySink` now creates a `_drainTcs` when `FinalizeAsync` stashes onto an in-flight dispatch and awaits it. The dispatch loop signals that TCS once it fully drains (success, swallowed dispatch failure, or exception), so `FinalizeAsync` only returns after the final chunk has been dispatched. `Dispose` also releases any pending waiter so a finalize-then- dispose path cannot hang. Adds `FinalizeAsync_DispatchInFlight_WaitsForFinalChunkOnWire` regression covering the race using a gated mock actor. Codex review: #407 (comment) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
TurnStreamingReplySinkpreviously dropped any delta that arrived inside the throttle window until the next delta showed up. When the LLM stalled near the window edge (cold-start, router handoff, tail-of-thought), the user saw the reply freeze for the rest of the throttle interval before the next batch landed.This PR:
FlushController-style): while a dispatch is in flight, additional deltas — caller-driven or timer-driven — are stashed; the dispatch loop reflushes the most recent_pendingTextafter the in-flight chunk completes. The conversation actor still observes strict edit ordering.IDisposablesoChannelLlmReplyInboxRuntimecleans up an unfired timer on paths that skipFinalizeAsync(interactive reply, empty reply, exception). Wired via ausingdeclaration.Microsoft.Extensions.TimeProvider.Testing.FakeTimeProviderfor deterministic timer-driven test control. The hand-rolledFakeTimeProviderinTurnStreamingReplySinkTestsis replaced with the BCL-aligned one.Background: studied https://github.com/ColinLu50/openclaw-lark-stream —
src/card/flush-controller.ts(mutex + reflush-on-conflict) andstreaming-card-controller.ts(deferred flush after long gap). This PR adopts the deferred-flush + reflush patterns; it does NOT adopt the OpenClaw CardKit 2.0 streaming card path (we stay on/reply+/reply/updateedit-message via NyxID relay).Out of scope
Followed up by #405:
Disabled+SuppressInterimboolean pair onNyxRelayStreamingStatewith an explicit phase enum +PHASE_TRANSITIONStable.ConversationGAgentso future handlers don't have to duplicate the checks.Test plan
TurnStreamingReplySinkTests— 12/12 pass (7 retained, 5 new: deferred timer flush, finalize-cancels-timer, dedup-against-last-emitted, dispose-prevents-flush, dispose-idempotent).Aevatar.GAgents.ChannelRuntime.Tests— 166/166 pass.Aevatar.GAgents.Channel.Protocol.Tests— 119/119 pass.tools/ci/test_stability_guards.sh— passed (no newTask.Delay/WaitUntilAsync; usesTimeProvider.CreateTimer+FakeTimeProvider.Advance).🤖 Generated with Claude Code