Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import * as SqlitePersistence from "./persistence/Layers/Sqlite";
import { OpenclawGatewayConfigLive } from "./persistence/Layers/OpenclawGatewayConfig";
import { makeServerProviderLayer, makeServerRuntimeServicesLayer } from "./serverLayers";
import { ProviderHealthLive } from "./provider/Layers/ProviderHealth";
import { ProviderRuntimeEventFeedLive } from "./provider/Layers/ProviderRuntimeEventFeed";
import { Server } from "./wsServer";
import { ServerLoggerLive } from "./serverLogger";
import { doctorCmd } from "./doctor";
Expand Down Expand Up @@ -195,6 +196,7 @@ const LayerLive = (input: CliInput) =>
Layer.empty.pipe(
Layer.provideMerge(makeServerRuntimeServicesLayer()),
Layer.provideMerge(makeServerProviderLayer()),
Layer.provideMerge(ProviderRuntimeEventFeedLive),
Layer.provideMerge(OpenclawGatewayConfigLive),
Layer.provideMerge(ProviderHealthLive),
Layer.provideMerge(SqlitePersistence.layerConfig),
Expand Down
16 changes: 8 additions & 8 deletions apps/server/src/orchestration/Layers/CheckpointReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
TurnId,
} from "@okcode/contracts";
import * as NodeServices from "@effect/platform-node/NodeServices";
import { Effect, Exit, Layer, ManagedRuntime, PubSub, Scope, Stream } from "effect";
import { Effect, Exit, Layer, ManagedRuntime, Scope, Stream } from "effect";
import { afterEach, describe, expect, it, vi } from "vitest";

import { CheckpointStoreLive } from "../../checkpointing/Layers/CheckpointStore.ts";
Expand All @@ -36,6 +36,8 @@ import {
ProviderService,
type ProviderServiceShape,
} from "../../provider/Services/ProviderService.ts";
import { ProviderRuntimeEventFeedLive } from "../../provider/Layers/ProviderRuntimeEventFeed.ts";
import { ProviderRuntimeEventFeed } from "../../provider/Services/ProviderRuntimeEventFeed.ts";
import { checkpointRefForThreadTurn } from "../../checkpointing/Utils.ts";
import { ServerConfig } from "../../config.ts";

Expand All @@ -62,7 +64,6 @@ function createProviderServiceHarness(
providerName: ProviderSession["provider"] = "codex",
) {
const now = new Date().toISOString();
const runtimeEventPubSub = Effect.runSync(PubSub.unbounded<ProviderRuntimeEvent>());
const rollbackConversation = vi.fn(
(_input: { readonly threadId: ThreadId; readonly numTurns: number }) => Effect.void,
);
Expand Down Expand Up @@ -93,17 +94,12 @@ function createProviderServiceHarness(
listSessions,
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
rollbackConversation,
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
};

const emit = (event: LegacyProviderRuntimeEvent): void => {
Effect.runSync(PubSub.publish(runtimeEventPubSub, event as unknown as ProviderRuntimeEvent));
streamEvents: Stream.empty,
};

return {
service,
rollbackConversation,
emit,
};
}

Expand Down Expand Up @@ -267,6 +263,7 @@ describe("CheckpointReactor", () => {
Layer.provideMerge(orchestrationLayer),
Layer.provideMerge(RuntimeReceiptBusLive),
Layer.provideMerge(Layer.succeed(ProviderService, provider.service)),
Layer.provideMerge(ProviderRuntimeEventFeedLive),
Layer.provideMerge(CheckpointStoreLive.pipe(Layer.provide(GitCoreLive))),
Layer.provideMerge(ServerConfigLayer),
Layer.provideMerge(NodeServices.layer),
Expand All @@ -276,6 +273,7 @@ describe("CheckpointReactor", () => {
const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
const reactor = await runtime.runPromise(Effect.service(CheckpointReactor));
const checkpointStore = await runtime.runPromise(Effect.service(CheckpointStore));
const eventFeed = await runtime.runPromise(Effect.service(ProviderRuntimeEventFeed));
scope = await Effect.runPromise(Scope.make("sequential"));
await Effect.runPromise(reactor.start.pipe(Scope.provide(scope)));
const drain = () => Effect.runPromise(reactor.drain);
Expand Down Expand Up @@ -335,6 +333,8 @@ describe("CheckpointReactor", () => {
engine,
provider,
cwd,
emit: (event: LegacyProviderRuntimeEvent) =>
Effect.runSync(eventFeed.publish(event as unknown as ProviderRuntimeEvent)),
drain,
};
}
Expand Down
4 changes: 3 additions & 1 deletion apps/server/src/orchestration/Layers/CheckpointReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { clearWorkspaceIndexCache } from "../../workspaceEntries.ts";
import { CheckpointStore } from "../../checkpointing/Services/CheckpointStore.ts";
import { ProviderService } from "../../provider/Services/ProviderService.ts";
import { ProviderRuntimeEventFeed } from "../../provider/Services/ProviderRuntimeEventFeed.ts";
import { CheckpointReactor, type CheckpointReactorShape } from "../Services/CheckpointReactor.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import { RuntimeReceiptBus } from "../Services/RuntimeReceiptBus.ts";
Expand Down Expand Up @@ -66,6 +67,7 @@ const serverCommandId = (tag: string): CommandId =>
const make = Effect.gen(function* () {
const orchestrationEngine = yield* OrchestrationEngineService;
const providerService = yield* ProviderService;
const providerRuntimeEventFeed = yield* ProviderRuntimeEventFeed;
const checkpointStore = yield* CheckpointStore;
const receiptBus = yield* RuntimeReceiptBus;

Expand Down Expand Up @@ -782,7 +784,7 @@ const make = Effect.gen(function* () {
);

yield* Effect.forkScoped(
Stream.runForEach(providerService.streamEvents, (event) => {
Stream.runForEach(providerRuntimeEventFeed.subscribeWithReplay(), (event) => {
if (event.type !== "turn.started" && event.type !== "turn.completed") {
return Effect.void;
}
Expand Down
9 changes: 4 additions & 5 deletions apps/server/src/orchestration/Layers/OrchestrationReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ export const makeOrchestrationReactor = Effect.gen(function* () {
const providerCommandReactor = yield* ProviderCommandReactor;
const checkpointReactor = yield* CheckpointReactor;

const start: OrchestrationReactorShape["start"] = Effect.gen(function* () {
yield* providerRuntimeIngestion.start;
yield* providerCommandReactor.start;
yield* checkpointReactor.start;
});
const start: OrchestrationReactorShape["start"] = Effect.all(
[providerRuntimeIngestion.start, providerCommandReactor.start, checkpointReactor.start],
{ concurrency: "unbounded", discard: true },
);

return {
start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
ThreadId,
TurnId,
} from "@okcode/contracts";
import { Effect, Exit, Layer, ManagedRuntime, PubSub, Scope, Stream } from "effect";
import { Effect, Exit, Layer, ManagedRuntime, Scope, Stream } from "effect";
import { afterEach, describe, expect, it } from "vitest";

import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts";
Expand All @@ -29,6 +29,8 @@ import {
ProviderService,
type ProviderServiceShape,
} from "../../provider/Services/ProviderService.ts";
import { ProviderRuntimeEventFeedLive } from "../../provider/Layers/ProviderRuntimeEventFeed.ts";
import { ProviderRuntimeEventFeed } from "../../provider/Services/ProviderRuntimeEventFeed.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
import { ProviderRuntimeIngestionLive } from "./ProviderRuntimeIngestion.ts";
Expand Down Expand Up @@ -61,7 +63,6 @@ type LegacyProviderRuntimeEvent = {
};

function createProviderServiceHarness() {
const runtimeEventPubSub = Effect.runSync(PubSub.unbounded<ProviderRuntimeEvent>());
const runtimeSessions: ProviderSession[] = [];

const unsupported = () => Effect.die(new Error("Unsupported provider call in test")) as never;
Expand All @@ -75,7 +76,7 @@ function createProviderServiceHarness() {
listSessions: () => Effect.succeed([...runtimeSessions]),
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
rollbackConversation: () => unsupported(),
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
streamEvents: Stream.empty,
};

const setSession = (session: ProviderSession): void => {
Expand All @@ -87,13 +88,8 @@ function createProviderServiceHarness() {
runtimeSessions.push(session);
};

const emit = (event: LegacyProviderRuntimeEvent): void => {
Effect.runSync(PubSub.publish(runtimeEventPubSub, event as unknown as ProviderRuntimeEvent));
};

return {
service,
emit,
setSession,
};
}
Expand Down Expand Up @@ -169,12 +165,14 @@ describe("ProviderRuntimeIngestion", () => {
Layer.provideMerge(orchestrationLayer),
Layer.provideMerge(SqlitePersistenceMemory),
Layer.provideMerge(Layer.succeed(ProviderService, provider.service)),
Layer.provideMerge(ProviderRuntimeEventFeedLive),
Layer.provideMerge(ServerConfig.layerTest(process.cwd(), process.cwd())),
Layer.provideMerge(NodeServices.layer),
);
runtime = ManagedRuntime.make(layer);
const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
const ingestion = await runtime.runPromise(Effect.service(ProviderRuntimeIngestionService));
const eventFeed = await runtime.runPromise(Effect.service(ProviderRuntimeEventFeed));
scope = await Effect.runPromise(Scope.make("sequential"));
await Effect.runPromise(ingestion.start.pipe(Scope.provide(scope)));
const drain = () => Effect.runPromise(ingestion.drain);
Expand Down Expand Up @@ -234,7 +232,8 @@ describe("ProviderRuntimeIngestion", () => {

return {
engine,
emit: provider.emit,
emit: (event: LegacyProviderRuntimeEvent) =>
Effect.runSync(eventFeed.publish(event as unknown as ProviderRuntimeEvent)),
setProviderSession: provider.setSession,
drain,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { Cache, Cause, Duration, Effect, Layer, Option, Ref, Stream } from "effe
import { makeDrainableWorker } from "@okcode/shared/DrainableWorker";

import { ProviderService } from "../../provider/Services/ProviderService.ts";
import { ProviderRuntimeEventFeed } from "../../provider/Services/ProviderRuntimeEventFeed.ts";
import { ProjectionTurnRepository } from "../../persistence/Services/ProjectionTurns.ts";
import { ProjectionTurnRepositoryLive } from "../../persistence/Layers/ProjectionTurns.ts";
import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts";
Expand Down Expand Up @@ -537,6 +538,7 @@ function runtimeEventToActivities(
const make = Effect.gen(function* () {
const orchestrationEngine = yield* OrchestrationEngineService;
const providerService = yield* ProviderService;
const providerRuntimeEventFeed = yield* ProviderRuntimeEventFeed;
const projectionTurnRepository = yield* ProjectionTurnRepository;

const assistantDeliveryModeRef = yield* Ref.make<AssistantDeliveryMode>(
Expand Down Expand Up @@ -1285,7 +1287,7 @@ const make = Effect.gen(function* () {

const start: ProviderRuntimeIngestionShape["start"] = Effect.gen(function* () {
yield* Effect.forkScoped(
Stream.runForEach(providerService.streamEvents, (event) =>
Stream.runForEach(providerRuntimeEventFeed.subscribeWithReplay(), (event) =>
worker.enqueue({ source: "runtime", event }),
),
);
Expand Down
43 changes: 43 additions & 0 deletions apps/server/src/provider/Layers/ProviderRuntimeEventFeed.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { EventId, ThreadId, TurnId, type ProviderRuntimeEvent } from "@okcode/contracts";
import { it } from "@effect/vitest";
import { describe, expect } from "vitest";
import { Effect, Layer, Stream } from "effect";

import { ProviderRuntimeEventFeedLive } from "./ProviderRuntimeEventFeed.ts";
import { ProviderRuntimeEventFeed } from "../Services/ProviderRuntimeEventFeed.ts";

function makeTurnStartedEvent(id: string): ProviderRuntimeEvent {
return {
type: "turn.started",
eventId: EventId.makeUnsafe(id),
provider: "codex",
threadId: ThreadId.makeUnsafe("thread-1"),
turnId: TurnId.makeUnsafe(`turn-${id}`),
createdAt: "2026-01-01T00:00:00.000Z",
};
}

describe("ProviderRuntimeEventFeedLive", () => {
it.effect("replays buffered events to late subscribers before live delivery", () =>
Effect.gen(function* () {
const feed = yield* ProviderRuntimeEventFeed;

yield* feed.publish(makeTurnStartedEvent("evt-1"));
yield* feed.publish(makeTurnStartedEvent("evt-2"));

const events = yield* Stream.take(feed.subscribeWithReplay(), 3).pipe(
Stream.runCollect,
Effect.fork,
);

yield* feed.publish(makeTurnStartedEvent("evt-3"));

const collected = yield* Effect.fromFiber(events);
expect(Array.from(collected).map((event) => event.eventId)).toEqual([
"evt-1",
"evt-2",
"evt-3",
]);
}).pipe(Effect.provide(Layer.mergeAll(ProviderRuntimeEventFeedLive))),
);
});
98 changes: 98 additions & 0 deletions apps/server/src/provider/Layers/ProviderRuntimeEventFeed.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import type { ProviderRuntimeEvent } from "@okcode/contracts";
import { Effect, Layer, Queue, Ref, Scope, Stream } from "effect";
import * as Semaphore from "effect/Semaphore";

import {
ProviderRuntimeEventFeed,
type ProviderRuntimeEventFeedShape,
} from "../Services/ProviderRuntimeEventFeed.ts";

const PROVIDER_RUNTIME_EVENT_REPLAY_CAPACITY = 256;

interface FeedState {
readonly buffer: ReadonlyArray<ProviderRuntimeEvent>;
readonly subscribers: ReadonlySet<Queue.Queue<ProviderRuntimeEvent>>;
}

const appendToReplayBuffer = (
buffer: ReadonlyArray<ProviderRuntimeEvent>,
event: ProviderRuntimeEvent,
): ReadonlyArray<ProviderRuntimeEvent> => {
if (buffer.length < PROVIDER_RUNTIME_EVENT_REPLAY_CAPACITY) {
return [...buffer, event];
}
return [...buffer.slice(1), event];
};

const makeProviderRuntimeEventFeed = Effect.gen(function* () {
const stateRef = yield* Ref.make<FeedState>({
buffer: [],
subscribers: new Set<Queue.Queue<ProviderRuntimeEvent>>(),
});
const mutex = yield* Semaphore.make(1);

const publish: ProviderRuntimeEventFeedShape["publish"] = (event) =>
mutex.withPermits(1)(
Effect.gen(function* () {
const subscribers = yield* Ref.modify(stateRef, (state) => {
const nextState: FeedState = {
buffer: appendToReplayBuffer(state.buffer, event),
subscribers: state.subscribers,
};
return [Array.from(state.subscribers), nextState] as const;
});
yield* Effect.forEach(subscribers, (subscriber) => Queue.offer(subscriber, event), {
discard: true,
});
}),
);

const subscribeWithReplay: ProviderRuntimeEventFeedShape["subscribeWithReplay"] = () =>
Stream.unwrapScoped(
Effect.gen(function* () {
const subscriber = yield* Queue.unbounded<ProviderRuntimeEvent>();
const replay = yield* mutex.withPermits(1)(
Ref.modify(stateRef, (state) => {
const subscribers = new Set(state.subscribers);
subscribers.add(subscriber);
return [
state.buffer,
{
buffer: state.buffer,
subscribers,
} satisfies FeedState,
] as const;
}),
);

yield* Effect.forEach(replay, (event) => Queue.offer(subscriber, event), {
discard: true,
});

yield* Scope.addFinalizer(() =>
mutex.withPermits(1)(
Ref.update(stateRef, (state) => {
const subscribers = new Set(state.subscribers);
subscribers.delete(subscriber);
return {
buffer: state.buffer,
subscribers,
} satisfies FeedState;
}),
),
);

return Stream.fromQueue(subscriber);
}),
);

return {
publish,
subscribeWithReplay,
} satisfies ProviderRuntimeEventFeedShape;
});

export const ProviderRuntimeEventFeedLive = Layer.effect(
ProviderRuntimeEventFeed,
makeProviderRuntimeEventFeed,
);
Loading
Loading