From 8c16ced371f10ae36db942d2d86176aee7c0470e Mon Sep 17 00:00:00 2001 From: Val Alexander Date: Mon, 13 Apr 2026 00:51:47 -0500 Subject: [PATCH] Stabilize runtime event and config refresh handling - Add replayable provider runtime event feed - Refresh provider status snapshots on demand - Keep websocket broadcasts flowing when one send fails --- apps/server/src/main.ts | 2 + .../Layers/CheckpointReactor.test.ts | 16 +-- .../orchestration/Layers/CheckpointReactor.ts | 4 +- .../Layers/OrchestrationReactor.ts | 9 +- .../Layers/ProviderRuntimeIngestion.test.ts | 17 ++- .../Layers/ProviderRuntimeIngestion.ts | 4 +- .../Layers/ProviderRuntimeEventFeed.test.ts | 43 +++++++ .../Layers/ProviderRuntimeEventFeed.ts | 98 +++++++++++++++ .../provider/Layers/ProviderService.test.ts | 7 ++ .../src/provider/Layers/ProviderService.ts | 3 + .../Services/ProviderRuntimeEventFeed.ts | 13 ++ apps/server/src/serverLayers.ts | 3 +- apps/server/src/wsServer.test.ts | 117 +++++++++++++++++- apps/server/src/wsServer.ts | 36 ++++-- apps/server/src/wsServer/pushBus.test.ts | 61 +++++++++ apps/server/src/wsServer/pushBus.ts | 21 +++- apps/web/src/lib/connectionSync.ts | 6 +- apps/web/src/lib/snapshotSyncManager.test.ts | 98 +++++++++++++++ apps/web/src/lib/snapshotSyncManager.ts | 101 +++++++++++++++ apps/web/src/routes/__root.tsx | 89 +++++-------- apps/web/src/wsNativeApi.test.ts | 30 +++++ apps/web/src/wsNativeApi.ts | 23 +++- bun.lock | 1 - 23 files changed, 702 insertions(+), 100 deletions(-) create mode 100644 apps/server/src/provider/Layers/ProviderRuntimeEventFeed.test.ts create mode 100644 apps/server/src/provider/Layers/ProviderRuntimeEventFeed.ts create mode 100644 apps/server/src/provider/Services/ProviderRuntimeEventFeed.ts create mode 100644 apps/web/src/lib/snapshotSyncManager.test.ts create mode 100644 apps/web/src/lib/snapshotSyncManager.ts diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 2a9747dc8..986425347 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -23,6 +23,7 @@ import * as SqlitePersistence from "./persistence/Layers/Sqlite"; import { OpenclawGatewayConfigLive } from "./persistence/Layers/OpenclawGatewayConfig"; import { makeServerProviderLayer, makeServerRuntimeServicesLayer } from "./serverLayers"; import { ProviderHealthLive } from "./provider/Layers/ProviderHealth"; +import { ProviderRuntimeEventFeedLive } from "./provider/Layers/ProviderRuntimeEventFeed"; import { Server } from "./wsServer"; import { ServerLoggerLive } from "./serverLogger"; import { doctorCmd } from "./doctor"; @@ -195,6 +196,7 @@ const LayerLive = (input: CliInput) => Layer.empty.pipe( Layer.provideMerge(makeServerRuntimeServicesLayer()), Layer.provideMerge(makeServerProviderLayer()), + Layer.provideMerge(ProviderRuntimeEventFeedLive), Layer.provideMerge(OpenclawGatewayConfigLive), Layer.provideMerge(ProviderHealthLive), Layer.provideMerge(SqlitePersistence.layerConfig), diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts index b60a514b2..c7fad3350 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts @@ -14,7 +14,7 @@ import { TurnId, } from "@okcode/contracts"; import * as NodeServices from "@effect/platform-node/NodeServices"; -import { Effect, Exit, Layer, ManagedRuntime, PubSub, Scope, Stream } from "effect"; +import { Effect, Exit, Layer, ManagedRuntime, Scope, Stream } from "effect"; import { afterEach, describe, expect, it, vi } from "vitest"; import { CheckpointStoreLive } from "../../checkpointing/Layers/CheckpointStore.ts"; @@ -36,6 +36,8 @@ import { ProviderService, type ProviderServiceShape, } from "../../provider/Services/ProviderService.ts"; +import { ProviderRuntimeEventFeedLive } from "../../provider/Layers/ProviderRuntimeEventFeed.ts"; +import { ProviderRuntimeEventFeed } from "../../provider/Services/ProviderRuntimeEventFeed.ts"; import { checkpointRefForThreadTurn } from "../../checkpointing/Utils.ts"; import { ServerConfig } from "../../config.ts"; @@ -62,7 +64,6 @@ function createProviderServiceHarness( providerName: ProviderSession["provider"] = "codex", ) { const now = new Date().toISOString(); - const runtimeEventPubSub = Effect.runSync(PubSub.unbounded()); const rollbackConversation = vi.fn( (_input: { readonly threadId: ThreadId; readonly numTurns: number }) => Effect.void, ); @@ -93,17 +94,12 @@ function createProviderServiceHarness( listSessions, getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }), rollbackConversation, - streamEvents: Stream.fromPubSub(runtimeEventPubSub), - }; - - const emit = (event: LegacyProviderRuntimeEvent): void => { - Effect.runSync(PubSub.publish(runtimeEventPubSub, event as unknown as ProviderRuntimeEvent)); + streamEvents: Stream.empty, }; return { service, rollbackConversation, - emit, }; } @@ -267,6 +263,7 @@ describe("CheckpointReactor", () => { Layer.provideMerge(orchestrationLayer), Layer.provideMerge(RuntimeReceiptBusLive), Layer.provideMerge(Layer.succeed(ProviderService, provider.service)), + Layer.provideMerge(ProviderRuntimeEventFeedLive), Layer.provideMerge(CheckpointStoreLive.pipe(Layer.provide(GitCoreLive))), Layer.provideMerge(ServerConfigLayer), Layer.provideMerge(NodeServices.layer), @@ -276,6 +273,7 @@ describe("CheckpointReactor", () => { const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); const reactor = await runtime.runPromise(Effect.service(CheckpointReactor)); const checkpointStore = await runtime.runPromise(Effect.service(CheckpointStore)); + const eventFeed = await runtime.runPromise(Effect.service(ProviderRuntimeEventFeed)); scope = await Effect.runPromise(Scope.make("sequential")); await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); const drain = () => Effect.runPromise(reactor.drain); @@ -335,6 +333,8 @@ describe("CheckpointReactor", () => { engine, provider, cwd, + emit: (event: LegacyProviderRuntimeEvent) => + Effect.runSync(eventFeed.publish(event as unknown as ProviderRuntimeEvent)), drain, }; } diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index a4dc2c896..2a4a72d02 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -19,6 +19,7 @@ import { import { clearWorkspaceIndexCache } from "../../workspaceEntries.ts"; import { CheckpointStore } from "../../checkpointing/Services/CheckpointStore.ts"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; +import { ProviderRuntimeEventFeed } from "../../provider/Services/ProviderRuntimeEventFeed.ts"; import { CheckpointReactor, type CheckpointReactorShape } from "../Services/CheckpointReactor.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { RuntimeReceiptBus } from "../Services/RuntimeReceiptBus.ts"; @@ -66,6 +67,7 @@ const serverCommandId = (tag: string): CommandId => const make = Effect.gen(function* () { const orchestrationEngine = yield* OrchestrationEngineService; const providerService = yield* ProviderService; + const providerRuntimeEventFeed = yield* ProviderRuntimeEventFeed; const checkpointStore = yield* CheckpointStore; const receiptBus = yield* RuntimeReceiptBus; @@ -782,7 +784,7 @@ const make = Effect.gen(function* () { ); yield* Effect.forkScoped( - Stream.runForEach(providerService.streamEvents, (event) => { + Stream.runForEach(providerRuntimeEventFeed.subscribeWithReplay(), (event) => { if (event.type !== "turn.started" && event.type !== "turn.completed") { return Effect.void; } diff --git a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts index 1e498885a..c5408beaf 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts @@ -13,11 +13,10 @@ export const makeOrchestrationReactor = Effect.gen(function* () { const providerCommandReactor = yield* ProviderCommandReactor; const checkpointReactor = yield* CheckpointReactor; - const start: OrchestrationReactorShape["start"] = Effect.gen(function* () { - yield* providerRuntimeIngestion.start; - yield* providerCommandReactor.start; - yield* checkpointReactor.start; - }); + const start: OrchestrationReactorShape["start"] = Effect.all( + [providerRuntimeIngestion.start, providerCommandReactor.start, checkpointReactor.start], + { concurrency: "unbounded", discard: true }, + ); return { start, diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 3f7c66bf7..3697399fc 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -19,7 +19,7 @@ import { ThreadId, TurnId, } from "@okcode/contracts"; -import { Effect, Exit, Layer, ManagedRuntime, PubSub, Scope, Stream } from "effect"; +import { Effect, Exit, Layer, ManagedRuntime, Scope, Stream } from "effect"; import { afterEach, describe, expect, it } from "vitest"; import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts"; @@ -29,6 +29,8 @@ import { ProviderService, type ProviderServiceShape, } from "../../provider/Services/ProviderService.ts"; +import { ProviderRuntimeEventFeedLive } from "../../provider/Layers/ProviderRuntimeEventFeed.ts"; +import { ProviderRuntimeEventFeed } from "../../provider/Services/ProviderRuntimeEventFeed.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; import { ProviderRuntimeIngestionLive } from "./ProviderRuntimeIngestion.ts"; @@ -61,7 +63,6 @@ type LegacyProviderRuntimeEvent = { }; function createProviderServiceHarness() { - const runtimeEventPubSub = Effect.runSync(PubSub.unbounded()); const runtimeSessions: ProviderSession[] = []; const unsupported = () => Effect.die(new Error("Unsupported provider call in test")) as never; @@ -75,7 +76,7 @@ function createProviderServiceHarness() { listSessions: () => Effect.succeed([...runtimeSessions]), getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }), rollbackConversation: () => unsupported(), - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + streamEvents: Stream.empty, }; const setSession = (session: ProviderSession): void => { @@ -87,13 +88,8 @@ function createProviderServiceHarness() { runtimeSessions.push(session); }; - const emit = (event: LegacyProviderRuntimeEvent): void => { - Effect.runSync(PubSub.publish(runtimeEventPubSub, event as unknown as ProviderRuntimeEvent)); - }; - return { service, - emit, setSession, }; } @@ -169,12 +165,14 @@ describe("ProviderRuntimeIngestion", () => { Layer.provideMerge(orchestrationLayer), Layer.provideMerge(SqlitePersistenceMemory), Layer.provideMerge(Layer.succeed(ProviderService, provider.service)), + Layer.provideMerge(ProviderRuntimeEventFeedLive), Layer.provideMerge(ServerConfig.layerTest(process.cwd(), process.cwd())), Layer.provideMerge(NodeServices.layer), ); runtime = ManagedRuntime.make(layer); const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); const ingestion = await runtime.runPromise(Effect.service(ProviderRuntimeIngestionService)); + const eventFeed = await runtime.runPromise(Effect.service(ProviderRuntimeEventFeed)); scope = await Effect.runPromise(Scope.make("sequential")); await Effect.runPromise(ingestion.start.pipe(Scope.provide(scope))); const drain = () => Effect.runPromise(ingestion.drain); @@ -234,7 +232,8 @@ describe("ProviderRuntimeIngestion", () => { return { engine, - emit: provider.emit, + emit: (event: LegacyProviderRuntimeEvent) => + Effect.runSync(eventFeed.publish(event as unknown as ProviderRuntimeEvent)), setProviderSession: provider.setSession, drain, }; diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 63b470781..82d05c0d8 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -17,6 +17,7 @@ import { Cache, Cause, Duration, Effect, Layer, Option, Ref, Stream } from "effe import { makeDrainableWorker } from "@okcode/shared/DrainableWorker"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; +import { ProviderRuntimeEventFeed } from "../../provider/Services/ProviderRuntimeEventFeed.ts"; import { ProjectionTurnRepository } from "../../persistence/Services/ProjectionTurns.ts"; import { ProjectionTurnRepositoryLive } from "../../persistence/Layers/ProjectionTurns.ts"; import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; @@ -537,6 +538,7 @@ function runtimeEventToActivities( const make = Effect.gen(function* () { const orchestrationEngine = yield* OrchestrationEngineService; const providerService = yield* ProviderService; + const providerRuntimeEventFeed = yield* ProviderRuntimeEventFeed; const projectionTurnRepository = yield* ProjectionTurnRepository; const assistantDeliveryModeRef = yield* Ref.make( @@ -1285,7 +1287,7 @@ const make = Effect.gen(function* () { const start: ProviderRuntimeIngestionShape["start"] = Effect.gen(function* () { yield* Effect.forkScoped( - Stream.runForEach(providerService.streamEvents, (event) => + Stream.runForEach(providerRuntimeEventFeed.subscribeWithReplay(), (event) => worker.enqueue({ source: "runtime", event }), ), ); diff --git a/apps/server/src/provider/Layers/ProviderRuntimeEventFeed.test.ts b/apps/server/src/provider/Layers/ProviderRuntimeEventFeed.test.ts new file mode 100644 index 000000000..207ee2288 --- /dev/null +++ b/apps/server/src/provider/Layers/ProviderRuntimeEventFeed.test.ts @@ -0,0 +1,43 @@ +import { EventId, ThreadId, TurnId, type ProviderRuntimeEvent } from "@okcode/contracts"; +import { it } from "@effect/vitest"; +import { describe, expect } from "vitest"; +import { Effect, Layer, Stream } from "effect"; + +import { ProviderRuntimeEventFeedLive } from "./ProviderRuntimeEventFeed.ts"; +import { ProviderRuntimeEventFeed } from "../Services/ProviderRuntimeEventFeed.ts"; + +function makeTurnStartedEvent(id: string): ProviderRuntimeEvent { + return { + type: "turn.started", + eventId: EventId.makeUnsafe(id), + provider: "codex", + threadId: ThreadId.makeUnsafe("thread-1"), + turnId: TurnId.makeUnsafe(`turn-${id}`), + createdAt: "2026-01-01T00:00:00.000Z", + }; +} + +describe("ProviderRuntimeEventFeedLive", () => { + it.effect("replays buffered events to late subscribers before live delivery", () => + Effect.gen(function* () { + const feed = yield* ProviderRuntimeEventFeed; + + yield* feed.publish(makeTurnStartedEvent("evt-1")); + yield* feed.publish(makeTurnStartedEvent("evt-2")); + + const events = yield* Stream.take(feed.subscribeWithReplay(), 3).pipe( + Stream.runCollect, + Effect.fork, + ); + + yield* feed.publish(makeTurnStartedEvent("evt-3")); + + const collected = yield* Effect.fromFiber(events); + expect(Array.from(collected).map((event) => event.eventId)).toEqual([ + "evt-1", + "evt-2", + "evt-3", + ]); + }).pipe(Effect.provide(Layer.mergeAll(ProviderRuntimeEventFeedLive))), + ); +}); diff --git a/apps/server/src/provider/Layers/ProviderRuntimeEventFeed.ts b/apps/server/src/provider/Layers/ProviderRuntimeEventFeed.ts new file mode 100644 index 000000000..341c308a5 --- /dev/null +++ b/apps/server/src/provider/Layers/ProviderRuntimeEventFeed.ts @@ -0,0 +1,98 @@ +import type { ProviderRuntimeEvent } from "@okcode/contracts"; +import { Effect, Layer, Queue, Ref, Scope, Stream } from "effect"; +import * as Semaphore from "effect/Semaphore"; + +import { + ProviderRuntimeEventFeed, + type ProviderRuntimeEventFeedShape, +} from "../Services/ProviderRuntimeEventFeed.ts"; + +const PROVIDER_RUNTIME_EVENT_REPLAY_CAPACITY = 256; + +interface FeedState { + readonly buffer: ReadonlyArray; + readonly subscribers: ReadonlySet>; +} + +const appendToReplayBuffer = ( + buffer: ReadonlyArray, + event: ProviderRuntimeEvent, +): ReadonlyArray => { + if (buffer.length < PROVIDER_RUNTIME_EVENT_REPLAY_CAPACITY) { + return [...buffer, event]; + } + return [...buffer.slice(1), event]; +}; + +const makeProviderRuntimeEventFeed = Effect.gen(function* () { + const stateRef = yield* Ref.make({ + buffer: [], + subscribers: new Set>(), + }); + const mutex = yield* Semaphore.make(1); + + const publish: ProviderRuntimeEventFeedShape["publish"] = (event) => + mutex.withPermits(1)( + Effect.gen(function* () { + const subscribers = yield* Ref.modify(stateRef, (state) => { + const nextState: FeedState = { + buffer: appendToReplayBuffer(state.buffer, event), + subscribers: state.subscribers, + }; + return [Array.from(state.subscribers), nextState] as const; + }); + yield* Effect.forEach(subscribers, (subscriber) => Queue.offer(subscriber, event), { + discard: true, + }); + }), + ); + + const subscribeWithReplay: ProviderRuntimeEventFeedShape["subscribeWithReplay"] = () => + Stream.unwrapScoped( + Effect.gen(function* () { + const subscriber = yield* Queue.unbounded(); + const replay = yield* mutex.withPermits(1)( + Ref.modify(stateRef, (state) => { + const subscribers = new Set(state.subscribers); + subscribers.add(subscriber); + return [ + state.buffer, + { + buffer: state.buffer, + subscribers, + } satisfies FeedState, + ] as const; + }), + ); + + yield* Effect.forEach(replay, (event) => Queue.offer(subscriber, event), { + discard: true, + }); + + yield* Scope.addFinalizer(() => + mutex.withPermits(1)( + Ref.update(stateRef, (state) => { + const subscribers = new Set(state.subscribers); + subscribers.delete(subscriber); + return { + buffer: state.buffer, + subscribers, + } satisfies FeedState; + }), + ), + ); + + return Stream.fromQueue(subscriber); + }), + ); + + return { + publish, + subscribeWithReplay, + } satisfies ProviderRuntimeEventFeedShape; +}); + +export const ProviderRuntimeEventFeedLive = Layer.effect( + ProviderRuntimeEventFeed, + makeProviderRuntimeEventFeed, +); diff --git a/apps/server/src/provider/Layers/ProviderService.test.ts b/apps/server/src/provider/Layers/ProviderService.test.ts index a639b956c..53f16a9c5 100644 --- a/apps/server/src/provider/Layers/ProviderService.test.ts +++ b/apps/server/src/provider/Layers/ProviderService.test.ts @@ -33,6 +33,7 @@ import type { ProviderAdapterShape } from "../Services/ProviderAdapter.ts"; import { ProviderAdapterRegistry } from "../Services/ProviderAdapterRegistry.ts"; import { ProviderService } from "../Services/ProviderService.ts"; import { ProviderSessionDirectory } from "../Services/ProviderSessionDirectory.ts"; +import { ProviderRuntimeEventFeedLive } from "./ProviderRuntimeEventFeed.ts"; import { makeProviderServiceLive } from "./ProviderService.ts"; import { ProviderSessionDirectoryLive } from "./ProviderSessionDirectory.ts"; import * as NodeServices from "@effect/platform-node/NodeServices"; @@ -250,6 +251,7 @@ function makeProviderServiceLayer() { makeProviderServiceLive().pipe( Layer.provide(providerAdapterLayer), Layer.provide(directoryLayer), + Layer.provide(ProviderRuntimeEventFeedLive), ), directoryLayer, @@ -297,6 +299,7 @@ it.effect("ProviderServiceLive keeps persisted resumable sessions on startup", ( const providerLayer = makeProviderServiceLive().pipe( Layer.provide(Layer.succeed(ProviderAdapterRegistry, registry)), Layer.provide(directoryLayer), + Layer.provide(ProviderRuntimeEventFeedLive), ); yield* Effect.gen(function* () { @@ -355,6 +358,7 @@ it.effect( const firstProviderLayer = makeProviderServiceLive().pipe( Layer.provide(Layer.succeed(ProviderAdapterRegistry, firstRegistry)), Layer.provide(firstDirectoryLayer), + Layer.provide(ProviderRuntimeEventFeedLive), ); const updatedResumeCursor = { threadId: asThreadId("thread-1"), @@ -405,6 +409,7 @@ it.effect( const secondProviderLayer = makeProviderServiceLive().pipe( Layer.provide(Layer.succeed(ProviderAdapterRegistry, secondRegistry)), Layer.provide(secondDirectoryLayer), + Layer.provide(ProviderRuntimeEventFeedLive), ); secondCodex.startSession.mockClear(); @@ -760,6 +765,7 @@ routing.layer("ProviderServiceLive routing", (it) => { const firstProviderLayer = makeProviderServiceLive().pipe( Layer.provide(Layer.succeed(ProviderAdapterRegistry, firstRegistry)), Layer.provide(firstDirectoryLayer), + Layer.provide(ProviderRuntimeEventFeedLive), ); const initial = yield* Effect.gen(function* () { @@ -791,6 +797,7 @@ routing.layer("ProviderServiceLive routing", (it) => { const secondProviderLayer = makeProviderServiceLive().pipe( Layer.provide(Layer.succeed(ProviderAdapterRegistry, secondRegistry)), Layer.provide(secondDirectoryLayer), + Layer.provide(ProviderRuntimeEventFeedLive), ); secondClaude.startSession.mockClear(); diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index ec1855f29..bbc11a745 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -25,6 +25,7 @@ import { Effect, Layer, Option, PubSub, Queue, Schema, SchemaIssue, Stream } fro import { ProviderValidationError } from "../Errors.ts"; import { ProviderAdapterRegistry } from "../Services/ProviderAdapterRegistry.ts"; +import { ProviderRuntimeEventFeed } from "../Services/ProviderRuntimeEventFeed.ts"; import { ProviderService, type ProviderServiceShape } from "../Services/ProviderService.ts"; import { ProviderSessionDirectory, @@ -153,6 +154,7 @@ const makeProviderService = (options?: ProviderServiceLiveOptions) => : undefined); const registry = yield* ProviderAdapterRegistry; + const runtimeEventFeed = yield* ProviderRuntimeEventFeed; const directory = yield* ProviderSessionDirectory; const runtimeEventQueue = yield* Queue.unbounded(); const runtimeEventPubSub = yield* PubSub.unbounded(); @@ -162,6 +164,7 @@ const makeProviderService = (options?: ProviderServiceLiveOptions) => Effect.tap((canonicalEvent) => canonicalEventLogger ? canonicalEventLogger.write(canonicalEvent, null) : Effect.void, ), + Effect.tap((canonicalEvent) => runtimeEventFeed.publish(canonicalEvent)), Effect.flatMap((canonicalEvent) => PubSub.publish(runtimeEventPubSub, canonicalEvent)), Effect.asVoid, ); diff --git a/apps/server/src/provider/Services/ProviderRuntimeEventFeed.ts b/apps/server/src/provider/Services/ProviderRuntimeEventFeed.ts new file mode 100644 index 000000000..bfbcf2e5f --- /dev/null +++ b/apps/server/src/provider/Services/ProviderRuntimeEventFeed.ts @@ -0,0 +1,13 @@ +import type { ProviderRuntimeEvent } from "@okcode/contracts"; +import { ServiceMap } from "effect"; +import type { Effect, Stream } from "effect"; + +export interface ProviderRuntimeEventFeedShape { + readonly publish: (event: ProviderRuntimeEvent) => Effect.Effect; + readonly subscribeWithReplay: () => Stream.Stream; +} + +export class ProviderRuntimeEventFeed extends ServiceMap.Service< + ProviderRuntimeEventFeed, + ProviderRuntimeEventFeedShape +>()("okcode/provider/Services/ProviderRuntimeEventFeed") {} diff --git a/apps/server/src/serverLayers.ts b/apps/server/src/serverLayers.ts index 7750a2cee..f4e5a8c57 100644 --- a/apps/server/src/serverLayers.ts +++ b/apps/server/src/serverLayers.ts @@ -27,6 +27,7 @@ import { ProviderHealthLive } from "./provider/Layers/ProviderHealth"; import { ProviderAdapterRegistryLive } from "./provider/Layers/ProviderAdapterRegistry"; import { makeProviderServiceLive } from "./provider/Layers/ProviderService"; import { ProviderSessionDirectoryLive } from "./provider/Layers/ProviderSessionDirectory"; +import { ProviderRuntimeEventFeed } from "./provider/Services/ProviderRuntimeEventFeed"; import { ProviderService } from "./provider/Services/ProviderService"; import { makeEventNdjsonLogger } from "./provider/Layers/EventNdjsonLogger"; import { EnvironmentVariablesLive } from "./persistence/Services/EnvironmentVariables"; @@ -72,7 +73,7 @@ const makeRuntimePtyAdapterLayer = () => export function makeServerProviderLayer(): Layer.Layer< ProviderService, ProviderUnsupportedError, - SqlClient.SqlClient | ServerConfig | FileSystem.FileSystem + SqlClient.SqlClient | ServerConfig | FileSystem.FileSystem | ProviderRuntimeEventFeed > { return Effect.gen(function* () { const { providerEventLogPath } = yield* ServerConfig; diff --git a/apps/server/src/wsServer.test.ts b/apps/server/src/wsServer.test.ts index 54d55ce04..c3fcaaeb6 100644 --- a/apps/server/src/wsServer.test.ts +++ b/apps/server/src/wsServer.test.ts @@ -48,6 +48,7 @@ import { makeSqlitePersistenceLive, SqlitePersistenceMemory } from "./persistenc import { SqlClient, SqlError } from "effect/unstable/sql"; import { ProviderService, type ProviderServiceShape } from "./provider/Services/ProviderService"; import { ProviderHealth, type ProviderHealthShape } from "./provider/Services/ProviderHealth"; +import { ProviderRuntimeEventFeedLive } from "./provider/Layers/ProviderRuntimeEventFeed"; import { Open, type OpenShape } from "./open"; import { GitManager, type GitManagerShape } from "./git/Services/GitManager.ts"; import type { GitCoreShape } from "./git/Services/GitCore.ts"; @@ -527,6 +528,7 @@ describe("WebSocket Server", () => { const scope = await Effect.runPromise(Scope.make("sequential")); const persistenceLayer = options.persistenceLayer ?? SqlitePersistenceMemory; const providerLayer = options.providerLayer ?? makeServerProviderLayer(); + const providerRuntimeEventFeedLayer = ProviderRuntimeEventFeedLive; const providerHealthLayer = Layer.succeed( ProviderHealth, options.providerHealth ?? defaultProviderHealthService, @@ -546,7 +548,10 @@ describe("WebSocket Server", () => { autoBootstrapProjectFromCwd: options.autoBootstrapProjectFromCwd ?? false, logWebSocketEvents: options.logWebSocketEvents ?? Boolean(options.devUrl), } satisfies ServerConfigShape); - const infrastructureLayer = providerLayer.pipe(Layer.provideMerge(persistenceLayer)); + const infrastructureLayer = providerLayer.pipe( + Layer.provideMerge(persistenceLayer), + Layer.provideMerge(providerRuntimeEventFeedLayer), + ); const projectionSnapshotQueryLayer = options.projectionSnapshotQuery ? Layer.succeed(ProjectionSnapshotQuery, options.projectionSnapshotQuery) : OrchestrationProjectionSnapshotQueryLive; @@ -571,6 +576,7 @@ describe("WebSocket Server", () => { const dependenciesLayer = Layer.empty.pipe( Layer.provideMerge(runtimeLayer), Layer.provideMerge(providerHealthLayer), + Layer.provideMerge(providerRuntimeEventFeedLayer), Layer.provideMerge(openLayer), Layer.provideMerge(serverConfigLayer), Layer.provideMerge(NodeServices.layer), @@ -893,6 +899,62 @@ describe("WebSocket Server", () => { expectAvailableEditors((response.result as { availableEditors: unknown }).availableEditors); }); + it("refreshes provider statuses on each server.getConfig call", async () => { + const { cwd } = makeWorkspaceFixture("workspace"); + let callCount = 0; + const providerHealth: ProviderHealthShape = { + getStatuses: Effect.sync(() => { + callCount += 1; + return [ + { + provider: "codex", + status: callCount === 1 ? "ready" : "error", + available: callCount === 1, + authStatus: callCount === 1 ? "authenticated" : "unauthenticated", + checkedAt: `2026-01-01T00:00:0${callCount}.000Z`, + }, + ] satisfies ReadonlyArray; + }), + }; + + server = await createTestServer({ cwd, providerHealth }); + const addr = server.address(); + const port = typeof addr === "object" && addr !== null ? addr.port : 0; + + const [ws] = await connectAndAwaitWelcome(port); + connections.push(ws); + + const firstResponse = await sendRequest(ws, WS_METHODS.serverGetConfig); + const secondResponse = await sendRequest(ws, WS_METHODS.serverGetConfig); + + expect(firstResponse.result).toEqual( + expect.objectContaining({ + providers: [ + { + provider: "codex", + status: "ready", + available: true, + authStatus: "authenticated", + checkedAt: "2026-01-01T00:00:01.000Z", + }, + ], + }), + ); + expect(secondResponse.result).toEqual( + expect.objectContaining({ + providers: [ + { + provider: "codex", + status: "error", + available: false, + authStatus: "unauthenticated", + checkedAt: "2026-01-01T00:00:02.000Z", + }, + ], + }), + ); + }); + it("bootstraps default keybindings file when missing", async () => { const baseDir = makeTempDir("okcode-state-bootstrap-keybindings-"); const { keybindingsConfigPath: keybindingsPath } = deriveServerPathsSync(baseDir, undefined); @@ -1055,6 +1117,59 @@ describe("WebSocket Server", () => { expect(successPush.data).toEqual({ issues: [], providers: defaultProviderStatuses }); }); + it("falls back to last known provider statuses when a later refresh fails", async () => { + const baseDir = makeTempDir("okcode-state-provider-status-fallback-"); + const { keybindingsConfigPath: keybindingsPath } = deriveServerPathsSync(baseDir, undefined); + ensureParentDir(keybindingsPath); + fs.writeFileSync(keybindingsPath, "[]", "utf8"); + + const { cwd } = makeWorkspaceFixture("workspace"); + let callCount = 0; + const liveStatuses: ReadonlyArray = [ + { + provider: "codex", + status: "ready", + available: true, + authStatus: "authenticated", + checkedAt: "2026-01-01T00:00:01.000Z", + }, + ]; + const providerHealth: ProviderHealthShape = { + getStatuses: Effect.sync(() => { + callCount += 1; + if (callCount === 1) { + return liveStatuses; + } + throw new Error("provider health probe failed"); + }), + }; + + server = await createTestServer({ cwd, baseDir, providerHealth }); + const addr = server.address(); + const port = typeof addr === "object" && addr !== null ? addr.port : 0; + + const [ws] = await connectAndAwaitWelcome(port); + connections.push(ws); + + const firstResponse = await sendRequest(ws, WS_METHODS.serverGetConfig); + expect(firstResponse.result).toEqual(expect.objectContaining({ providers: liveStatuses })); + + const malformedPush = await rewriteKeybindingsAndWaitForPush( + ws, + keybindingsPath, + "{ not-json", + (push) => + Array.isArray(push.data.issues) && + Boolean(push.data.issues[0]) && + push.data.issues[0]!.kind === "keybindings.malformed-config", + ); + + expect(malformedPush.data).toEqual({ + issues: [{ kind: "keybindings.malformed-config", message: expect.any(String) }], + providers: liveStatuses, + }); + }); + it("routes shell.openInEditor through the injected open service", async () => { const { cwd } = makeWorkspaceFixture("workspace"); const openCalls: Array<{ cwd: string; editor: string }> = []; diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts index 066c855b0..20a7483df 100644 --- a/apps/server/src/wsServer.ts +++ b/apps/server/src/wsServer.ts @@ -386,7 +386,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< ), ); - const providerStatuses = yield* providerHealth.getStatuses; + let lastKnownProviderStatuses = yield* providerHealth.getStatuses; const clients = yield* Ref.make(new Set()); const logger = createLogger("ws"); @@ -488,7 +488,23 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< const pushBus = yield* makeServerPushBus({ clients, logOutgoingPush, + logDeliveryFailure: (input) => { + logger.warn("failed to deliver websocket push", input); + }, }); + const getProviderStatuses = () => + providerHealth.getStatuses.pipe( + Effect.tap((statuses) => + Effect.sync(() => { + lastKnownProviderStatuses = statuses; + }), + ), + Effect.catch((cause) => + Effect.logWarning("failed to refresh provider statuses", { + cause, + }).pipe(Effect.as(lastKnownProviderStatuses)), + ), + ); yield* readiness.markPushBusReady; yield* keybindingsManager.start.pipe( Effect.mapError( @@ -854,18 +870,23 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< ).pipe(Effect.forkIn(subscriptionsScope)); yield* Stream.runForEach(keybindingsManager.streamChanges, (event) => - pushBus.publishAll(WS_CHANNELS.serverConfigUpdated, { - issues: event.issues, - providers: providerStatuses, - }), + getProviderStatuses().pipe( + Effect.flatMap((providers) => + pushBus.publishAll(WS_CHANNELS.serverConfigUpdated, { + issues: event.issues, + providers, + }), + ), + ), ).pipe(Effect.forkIn(subscriptionsScope)); const publishServerConfigUpdated = () => Effect.gen(function* () { const keybindingsConfig = yield* keybindingsManager.loadConfigState; + const providers = yield* getProviderStatuses(); yield* pushBus.publishAll(WS_CHANNELS.serverConfigUpdated, { issues: keybindingsConfig.issues, - providers: providerStatuses, + providers, }); }); @@ -1579,12 +1600,13 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< case WS_METHODS.serverGetConfig: const keybindingsConfig = yield* keybindingsManager.loadConfigState; + const providers = yield* getProviderStatuses(); return { cwd, keybindingsConfigPath, keybindings: keybindingsConfig.keybindings, issues: keybindingsConfig.issues, - providers: providerStatuses, + providers, availableEditors, buildInfo: serverBuildInfo, }; diff --git a/apps/server/src/wsServer/pushBus.test.ts b/apps/server/src/wsServer/pushBus.test.ts index 59e31ad28..8993300db 100644 --- a/apps/server/src/wsServer/pushBus.test.ts +++ b/apps/server/src/wsServer/pushBus.test.ts @@ -12,9 +12,13 @@ class MockWebSocket { readonly OPEN = MockWebSocket.OPEN; readyState = MockWebSocket.OPEN; readonly sent: string[] = []; + throwOnSend = false; private readonly waiters = new Set<() => void>(); send(message: string) { + if (this.throwOnSend) { + throw new Error("send failed"); + } this.sent.push(message); for (const waiter of this.waiters) { waiter(); @@ -49,6 +53,7 @@ describe("makeServerPushBus", () => { const pushBus = yield* makeServerPushBus({ clients, logOutgoingPush: () => {}, + logDeliveryFailure: () => {}, }); yield* pushBus.publishAll(WS_CHANNELS.serverConfigUpdated, { @@ -101,4 +106,60 @@ describe("makeServerPushBus", () => { }), ), ); + + it.live("continues broadcasting when one client send throws", () => + Effect.scoped( + Effect.gen(function* () { + const failingClient = new MockWebSocket(); + failingClient.throwOnSend = true; + const healthyClient = new MockWebSocket(); + const clients = yield* Ref.make( + new Set([ + failingClient as unknown as WebSocket, + healthyClient as unknown as WebSocket, + ]), + ); + const pushBus = yield* makeServerPushBus({ + clients, + logOutgoingPush: () => {}, + logDeliveryFailure: () => {}, + }); + + yield* pushBus.publishAll(WS_CHANNELS.serverConfigUpdated, { + issues: [], + providers: [], + }); + + yield* Effect.promise(() => healthyClient.waitForSentCount(1)); + expect(healthyClient.sent).toHaveLength(1); + expect(failingClient.sent).toHaveLength(0); + }), + ), + ); + + it.live("returns false when a targeted client send throws", () => + Effect.scoped( + Effect.gen(function* () { + const client = new MockWebSocket(); + client.throwOnSend = true; + const clients = yield* Ref.make(new Set()); + const pushBus = yield* makeServerPushBus({ + clients, + logOutgoingPush: () => {}, + logDeliveryFailure: () => {}, + }); + + const delivered = yield* pushBus.publishClient( + client as unknown as WebSocket, + WS_CHANNELS.serverWelcome, + { + cwd: "/tmp/project", + projectName: "project", + }, + ); + + expect(delivered).toBe(false); + }), + ), + ); }); diff --git a/apps/server/src/wsServer/pushBus.ts b/apps/server/src/wsServer/pushBus.ts index 6f9a397e1..715fb4319 100644 --- a/apps/server/src/wsServer/pushBus.ts +++ b/apps/server/src/wsServer/pushBus.ts @@ -34,6 +34,13 @@ export interface ServerPushBus { export const makeServerPushBus = (input: { readonly clients: Ref.Ref>; readonly logOutgoingPush: (push: WsPushEnvelopeBase, recipients: number) => void; + readonly logDeliveryFailure: (input: { + readonly channel: WsPushChannel; + readonly sequence: number; + readonly target: PushTarget["kind"]; + readonly readyState: number; + readonly error: unknown; + }) => void; }): Effect.Effect => Effect.gen(function* () { const nextSequence = yield* Ref.make(0); @@ -63,8 +70,18 @@ export const makeServerPushBus = (input: { if (client.readyState !== client.OPEN) { continue; } - client.send(message); - recipientCount += 1; + try { + client.send(message); + recipientCount += 1; + } catch (error) { + input.logDeliveryFailure({ + channel: job.channel, + sequence, + target: job.target.kind, + readyState: client.readyState, + error, + }); + } } input.logOutgoingPush(push, recipientCount); diff --git a/apps/web/src/lib/connectionSync.ts b/apps/web/src/lib/connectionSync.ts index 6ce67d6cf..b460c7339 100644 --- a/apps/web/src/lib/connectionSync.ts +++ b/apps/web/src/lib/connectionSync.ts @@ -15,10 +15,12 @@ import { projectQueryKeys } from "./projectReactQuery"; import { serverQueryKeys } from "./serverReactQuery"; import { prReviewQueryKeys } from "./prReviewReactQuery"; import { skillQueryKeys } from "./skillReactQuery"; -import type { WsTransport } from "../wsTransport"; +export interface ReconnectableTransport { + readonly onReconnected: (listener: () => void) => () => void; +} export interface ConnectionSyncManagerOptions { - transport: WsTransport; + transport: ReconnectableTransport; queryClient: QueryClient; /** * Called after the query caches have been invalidated so the diff --git a/apps/web/src/lib/snapshotSyncManager.test.ts b/apps/web/src/lib/snapshotSyncManager.test.ts new file mode 100644 index 000000000..d2ed06e9e --- /dev/null +++ b/apps/web/src/lib/snapshotSyncManager.test.ts @@ -0,0 +1,98 @@ +import type { OrchestrationReadModel } from "@okcode/contracts"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { createSnapshotSyncManager } from "./snapshotSyncManager"; + +function makeSnapshot(sequence: number): OrchestrationReadModel { + return { + snapshotSequence: sequence, + updatedAt: "2026-01-01T00:00:00.000Z", + projects: [], + threads: [], + }; +} + +describe("createSnapshotSyncManager", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it("retries after a failed fetch until a later sync succeeds", async () => { + const fetchSnapshot = vi + .fn<() => Promise>() + .mockRejectedValueOnce(new Error("offline")) + .mockResolvedValueOnce(makeSnapshot(2)); + const applySnapshot = vi.fn(); + const manager = createSnapshotSyncManager({ + fetchSnapshot, + applySnapshot, + }); + + void manager.scheduleSync(); + await Promise.resolve(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(1); + expect(applySnapshot).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(250); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + expect(applySnapshot).toHaveBeenCalledTimes(1); + expect(applySnapshot).toHaveBeenCalledWith(makeSnapshot(2)); + }); + + it("coalesces overlapping sync requests and reruns once after success", async () => { + let resolveFetch: ((snapshot: OrchestrationReadModel) => void) | null = null; + const fetchSnapshot = vi + .fn<() => Promise>() + .mockImplementation( + () => + new Promise((resolve) => { + resolveFetch = resolve; + }), + ) + .mockResolvedValueOnce(makeSnapshot(2)); + const applySnapshot = vi.fn(); + const manager = createSnapshotSyncManager({ + fetchSnapshot, + applySnapshot, + }); + + const firstSync = manager.scheduleSync(); + const secondSync = manager.scheduleSync(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(1); + expect(firstSync).toBe(secondSync); + + resolveFetch?.(makeSnapshot(1)); + await firstSync; + await Promise.resolve(); + + expect(fetchSnapshot).toHaveBeenCalledTimes(2); + expect(applySnapshot.mock.calls).toEqual([[makeSnapshot(1)], [makeSnapshot(2)]]); + }); + + it("stops retrying after dispose", async () => { + const fetchSnapshot = vi + .fn<() => Promise>() + .mockRejectedValue(new Error("offline")); + const applySnapshot = vi.fn(); + const manager = createSnapshotSyncManager({ + fetchSnapshot, + applySnapshot, + }); + + void manager.scheduleSync(); + await Promise.resolve(); + manager.dispose(); + await vi.advanceTimersByTimeAsync(5_000); + + expect(fetchSnapshot).toHaveBeenCalledTimes(1); + expect(applySnapshot).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/web/src/lib/snapshotSyncManager.ts b/apps/web/src/lib/snapshotSyncManager.ts new file mode 100644 index 000000000..7892419b8 --- /dev/null +++ b/apps/web/src/lib/snapshotSyncManager.ts @@ -0,0 +1,101 @@ +import type { OrchestrationReadModel } from "@okcode/contracts"; + +const SNAPSHOT_RETRY_DELAYS_MS = [250, 500, 1_000, 2_000, 5_000] as const; + +export interface SnapshotSyncManagerOptions { + readonly fetchSnapshot: () => Promise; + readonly applySnapshot: (snapshot: OrchestrationReadModel) => void; +} + +export interface SnapshotSyncManager { + readonly scheduleSync: () => Promise; + readonly dispose: () => void; +} + +export function createSnapshotSyncManager( + options: SnapshotSyncManagerOptions, +): SnapshotSyncManager { + let disposed = false; + let syncing = false; + let pending = false; + let retryAttempt = 0; + let retryTimer: ReturnType | null = null; + let inFlightSync: Promise | null = null; + + const clearRetryTimer = () => { + if (retryTimer !== null) { + clearTimeout(retryTimer); + retryTimer = null; + } + }; + + const scheduleRetry = () => { + if (disposed || retryTimer !== null) { + return; + } + const delay = + SNAPSHOT_RETRY_DELAYS_MS[Math.min(retryAttempt, SNAPSHOT_RETRY_DELAYS_MS.length - 1)] ?? + SNAPSHOT_RETRY_DELAYS_MS[0]; + retryAttempt += 1; + retryTimer = setTimeout(() => { + retryTimer = null; + void runSync(); + }, delay); + }; + + const runSync = async (): Promise => { + if (disposed) { + return; + } + clearRetryTimer(); + if (syncing) { + pending = true; + return; + } + + syncing = true; + pending = false; + + try { + const snapshot = await options.fetchSnapshot(); + if (disposed) { + return; + } + retryAttempt = 0; + options.applySnapshot(snapshot); + } catch { + if (!disposed) { + scheduleRetry(); + } + return; + } finally { + syncing = false; + } + + if (pending && !disposed) { + pending = false; + void runSync(); + } + }; + + return { + scheduleSync: () => { + if (syncing) { + pending = true; + return inFlightSync ?? Promise.resolve(); + } + const sync = runSync().finally(() => { + if (inFlightSync === sync) { + inFlightSync = null; + } + }); + inFlightSync = sync; + return sync; + }, + dispose: () => { + disposed = true; + pending = false; + clearRetryTimer(); + }, + }; +} diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx index 6c37aec6f..5aa5770df 100644 --- a/apps/web/src/routes/__root.tsx +++ b/apps/web/src/routes/__root.tsx @@ -24,10 +24,9 @@ import { terminalRunningSubprocessFromEvent } from "../terminalActivity"; import { onServerConfigUpdated, onServerWelcome, onTransportReconnected } from "../wsNativeApi"; import { providerQueryKeys } from "../lib/providerReactQuery"; import { projectQueryKeys } from "../lib/projectReactQuery"; -import { gitQueryKeys } from "../lib/gitReactQuery"; -import { prReviewQueryKeys } from "../lib/prReviewReactQuery"; -import { skillQueryKeys } from "../lib/skillReactQuery"; import { collectActiveTerminalThreadIds } from "../lib/terminalStateCleanup"; +import { createConnectionSyncManager } from "../lib/connectionSync"; +import { createSnapshotSyncManager } from "../lib/snapshotSyncManager"; import { OnboardingDialog } from "../components/onboarding/OnboardingDialog"; import { MobileConnectionBanner } from "../components/mobile/MobileConnectionBanner"; import { MobilePairingScreen } from "../components/mobile/MobilePairingScreen"; @@ -201,44 +200,24 @@ function EventRouter() { if (!api) return; let disposed = false; let latestSequence = 0; - let syncing = false; - let pending = false; let needsProviderInvalidation = false; - - const flushSnapshotSync = async (): Promise => { - const snapshot = await api.orchestration.getSnapshot(); - if (disposed) return; - latestSequence = Math.max(latestSequence, snapshot.snapshotSequence); - syncServerReadModel(snapshot); - clearPromotedDraftThreads(new Set(snapshot.threads.map((t) => t.id))); - const draftThreadIds = Object.keys( - useComposerDraftStore.getState().draftThreadsByThreadId, - ) as ThreadId[]; - const activeThreadIds = collectActiveTerminalThreadIds({ - snapshotThreads: snapshot.threads, - draftThreadIds, - }); - removeOrphanedTerminalStates(activeThreadIds); - if (pending) { - pending = false; - await flushSnapshotSync(); - } - }; - - const syncSnapshot = async () => { - if (syncing) { - pending = true; - return; - } - syncing = true; - pending = false; - try { - await flushSnapshotSync(); - } catch { - // Keep prior state and wait for next domain event to trigger a resync. - } - syncing = false; - }; + const snapshotSync = createSnapshotSyncManager({ + fetchSnapshot: () => api.orchestration.getSnapshot(), + applySnapshot: (snapshot) => { + if (disposed) return; + latestSequence = Math.max(latestSequence, snapshot.snapshotSequence); + syncServerReadModel(snapshot); + clearPromotedDraftThreads(new Set(snapshot.threads.map((t) => t.id))); + const draftThreadIds = Object.keys( + useComposerDraftStore.getState().draftThreadsByThreadId, + ) as ThreadId[]; + const activeThreadIds = collectActiveTerminalThreadIds({ + snapshotThreads: snapshot.threads, + draftThreadIds, + }); + removeOrphanedTerminalStates(activeThreadIds); + }, + }); const domainEventFlushThrottler = new Throttler( () => { @@ -249,7 +228,7 @@ function EventRouter() { // reflects files created, deleted, or restored during this turn. void queryClient.invalidateQueries({ queryKey: projectQueryKeys.all }); } - void syncSnapshot(); + snapshotSync.scheduleSync(); }, { wait: 100, @@ -284,7 +263,7 @@ function EventRouter() { }); const unsubWelcome = onServerWelcome((payload) => { void (async () => { - await syncSnapshot(); + await snapshotSync.scheduleSync(); if (disposed) { return; } @@ -308,24 +287,16 @@ function EventRouter() { handledBootstrapThreadIdRef.current = payload.bootstrapThreadId; })().catch(() => undefined); }); - // ── Reconnection sync ────────────────────────────────────────────── - // When the WebSocket re-opens after a network interruption, invalidate - // all query caches and re-fetch the orchestration snapshot so the UI - // converges back to the server's truth. const unsubReconnected = onTransportReconnected(() => { - // Reset the sequence tracker so replayed domain events are accepted. latestSequence = 0; - - // Invalidate all domain query caches. - void queryClient.invalidateQueries({ queryKey: gitQueryKeys.all }); - void queryClient.invalidateQueries({ queryKey: providerQueryKeys.all }); - void queryClient.invalidateQueries({ queryKey: projectQueryKeys.all }); - void queryClient.invalidateQueries({ queryKey: serverQueryKeys.all }); - void queryClient.invalidateQueries({ queryKey: prReviewQueryKeys.all }); - void queryClient.invalidateQueries({ queryKey: skillQueryKeys.all }); - - // Trigger a full snapshot sync. - void syncSnapshot(); + }); + const unsubConnectionSync = createConnectionSyncManager({ + transport: { onReconnected: onTransportReconnected }, + queryClient, + onResync: () => { + latestSequence = 0; + snapshotSync.scheduleSync(); + }, }); // onServerConfigUpdated replays the latest cached value synchronously @@ -377,11 +348,13 @@ function EventRouter() { return () => { disposed = true; needsProviderInvalidation = false; + snapshotSync.dispose(); domainEventFlushThrottler.cancel(); unsubDomainEvent(); unsubTerminalEvent(); unsubWelcome(); unsubReconnected(); + unsubConnectionSync(); unsubServerConfigUpdated(); }; }, [ diff --git a/apps/web/src/wsNativeApi.test.ts b/apps/web/src/wsNativeApi.test.ts index a5e129f87..c27a7952a 100644 --- a/apps/web/src/wsNativeApi.test.ts +++ b/apps/web/src/wsNativeApi.test.ts @@ -53,6 +53,13 @@ const subscribeStateMock = vi.fn<(listener: (state: string) => void) => () => vo return () => {}; }); const getStateMock = vi.fn<() => string>(() => "connecting"); +const reconnectedListeners = new Set<() => void>(); +const onReconnectedMock = vi.fn<(listener: () => void) => () => void>((listener) => { + reconnectedListeners.add(listener); + return () => { + reconnectedListeners.delete(listener); + }; +}); vi.mock("./wsTransport", () => { return { @@ -61,6 +68,7 @@ vi.mock("./wsTransport", () => { subscribe = subscribeMock; subscribeState = subscribeStateMock; getState = getStateMock; + onReconnected = onReconnectedMock; getLatestPush(channel: string) { return latestPushByChannel.get(channel) ?? null; } @@ -116,8 +124,10 @@ beforeEach(() => { subscribeMock.mockClear(); subscribeStateMock.mockClear(); getStateMock.mockClear(); + onReconnectedMock.mockClear(); channelListeners.clear(); latestPushByChannel.clear(); + reconnectedListeners.clear(); nextPushSequence = 1; Reflect.deleteProperty(getWindowForTest(), "desktopBridge"); }); @@ -242,6 +252,26 @@ describe("wsNativeApi", () => { }); }); + it("registers reconnect listeners that were added before transport creation", async () => { + const { createWsNativeApi, onTransportReconnected } = await import("./wsNativeApi"); + const listener = vi.fn(); + + const unsubscribe = onTransportReconnected(listener); + expect(onReconnectedMock).not.toHaveBeenCalled(); + + createWsNativeApi(); + expect(onReconnectedMock).toHaveBeenCalledTimes(1); + + for (const reconnectListener of reconnectedListeners) { + reconnectListener(); + } + + expect(listener).toHaveBeenCalledTimes(1); + + unsubscribe(); + expect(reconnectedListeners.size).toBe(0); + }); + it("forwards valid terminal and orchestration events", async () => { const { createWsNativeApi } = await import("./wsNativeApi"); diff --git a/apps/web/src/wsNativeApi.ts b/apps/web/src/wsNativeApi.ts index efa8351aa..f9d9219a6 100644 --- a/apps/web/src/wsNativeApi.ts +++ b/apps/web/src/wsNativeApi.ts @@ -30,6 +30,17 @@ const prReviewRepoConfigUpdatedListeners = new Set< const projectFileTreeChangedListeners = new Set<(payload: ProjectFileTreeChangedPayload) => void>(); const smeMessageEventListeners = new Set<(event: SmeMessageEvent) => void>(); const transportStateListeners = new Set<(state: TransportState) => void>(); +const pendingReconnectedListeners = new Set<() => void>(); +const activeReconnectedUnsubscribers = new Map<() => void, () => void>(); + +function registerDeferredReconnectedListeners(transport: WsTransport): void { + for (const listener of pendingReconnectedListeners) { + if (activeReconnectedUnsubscribers.has(listener)) { + continue; + } + activeReconnectedUnsubscribers.set(listener, transport.onReconnected(listener)); + } +} /** * Subscribe to the server welcome message. If a welcome was already received @@ -113,11 +124,14 @@ export function onTransportReconnected(listener: () => void): () => void { return instance.transport.onReconnected(listener); } - // Transport not ready yet – defer until it exists. - const reconnectedListeners = new Set<() => void>(); - reconnectedListeners.add(listener); + pendingReconnectedListeners.add(listener); return () => { - reconnectedListeners.delete(listener); + pendingReconnectedListeners.delete(listener); + const unsubscribe = activeReconnectedUnsubscribers.get(listener); + if (unsubscribe) { + activeReconnectedUnsubscribers.delete(listener); + unsubscribe(); + } }; } @@ -125,6 +139,7 @@ export function createWsNativeApi(): NativeApi { if (instance) return instance.api; const transport = new WsTransport(); + registerDeferredReconnectedListeners(transport); // Initialize mobile push notifications when running in the mobile shell. initMobileNotifications(transport); diff --git a/bun.lock b/bun.lock index 68ac61870..618efb676 100644 --- a/bun.lock +++ b/bun.lock @@ -2489,7 +2489,6 @@ "next/postcss": ["postcss@8.4.31", "", { "dependencies": { "nanoid": "^3.3.6", "picocolors": "^1.0.0", "source-map-js": "^1.0.2" } }, "sha512-PS08Iboia9mts/2ygV3eLpY5ghnUcfLV/EXTOW1E2qYxJKGGBUtNjN76FYHnMs36RmARn41bC0AZmn+rR0OVpQ=="], - "parse-entities/@types/unist": ["@types/unist@2.0.11", "", {}, "sha512-CmBKiL6NNo/OqgmMn95Fk9Whlp2mtvIv+KNpQKN2F4SjvrEesubTRWGYSg+BnWZOnlCaSTU1sMpsBOzgbYhnsA=="], "path-scurry/lru-cache": ["lru-cache@11.2.7", "", {}, "sha512-aY/R+aEsRelme17KGQa/1ZSIpLpNYYrhcrepKTZgE+W3WM16YMCaPwOHLHsmopZHELU0Ojin1lPVxKR0MihncA=="],