Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/slimy-yaks-talk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@voltagent/ag-ui": patch
---

feat(ag-ui): add ACTIVITY_SNAPSHOT and ACTIVITY_DELTA event support
159 changes: 151 additions & 8 deletions packages/ag-ui/src/voltagent-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ import type { Agent } from "@voltagent/core";
import { safeStringify } from "@voltagent/internal/utils";
import { Observable } from "rxjs";

// ---------------------------------------------------------------------------
// Activity event types – not yet available in @ag-ui/core v0.0.37, so we
// define them locally until the upstream package adds them.
// ---------------------------------------------------------------------------

const ACTIVITY_SNAPSHOT_EVENT_TYPE = "ACTIVITY_SNAPSHOT" as const;
const ACTIVITY_DELTA_EVENT_TYPE = "ACTIVITY_DELTA" as const;

type ActivitySnapshotEvent = {
type: typeof ACTIVITY_SNAPSHOT_EVENT_TYPE;
messageId: string;
activityType: string;
content: Record<string, unknown>;
replace: boolean;
};

type ActivityDeltaEvent = {
type: typeof ACTIVITY_DELTA_EVENT_TYPE;
messageId: string;
activityType: string;
patch: unknown[];
};

type AGUIContextValue = {
state?: Record<string, unknown>;
context?: RunAgentInput["context"];
Expand Down Expand Up @@ -95,7 +118,7 @@ export class VoltAgentAGUI extends AbstractAgent {
for (const event of events) {
debugLog("emit event", { type: event.type, messageId: (event as any).messageId });

subscriber.next(event);
subscriber.next(event as BaseEvent);
if (
(event.type === EventType.TEXT_MESSAGE_START ||
event.type === EventType.TEXT_MESSAGE_CHUNK ||
Expand Down Expand Up @@ -345,11 +368,25 @@ function convertAGUIMessagesToVoltMessages(messages: Message[]): VoltUIMessage[]
}

// activity or any other custom role -> fold into assistant text
convertedMessages.push({
id: messageId,
role: "assistant",
parts: [{ type: "text", text: safeStringify((msg as any).content ?? "") }],
});
{
const activityContent = (msg as { content?: unknown }).content;
const activityType = (msg as { activityType?: string }).activityType;
const prefix = activityType ? `[Activity: ${activityType}] ` : "";
convertedMessages.push({
id: messageId,
role: "assistant",
parts: [
{
type: "text",
text:
prefix +
(typeof activityContent === "string"
? activityContent
: safeStringify(activityContent ?? "")),
},
],
});
}
}

return convertedMessages;
Expand Down Expand Up @@ -392,7 +429,9 @@ type StreamConversionResult =
| ToolCallStartEvent
| ToolCallEndEvent
| ToolCallArgsEvent
| ToolCallResultEvent;
| ToolCallResultEvent
| ActivitySnapshotEvent
| ActivityDeltaEvent;

function convertVoltStreamPartToEvents(
part: VoltAgentTextStreamPart,
Expand Down Expand Up @@ -476,6 +515,19 @@ function convertVoltStreamPartToEvents(
(payload as { output?: unknown }).output ??
(payload as { data?: unknown }).data ??
payload;

const activityEvents = tryExtractActivityFromToolResult(rawResult, fallbackMessageId);
if (activityEvents) {
const resultEvent: ToolCallResultEvent = {
type: EventType.TOOL_CALL_RESULT,
toolCallId: (payload as { toolCallId?: string }).toolCallId ?? generateId(),
content: safeStringify(rawResult ?? {}),
messageId: generateId(),
role: "tool",
};
return [...activityEvents, resultEvent];
}

const resultEvent: ToolCallResultEvent = {
type: EventType.TOOL_CALL_RESULT,
toolCallId: (payload as { toolCallId?: string }).toolCallId ?? generateId(),
Expand All @@ -494,15 +546,106 @@ function convertVoltStreamPartToEvents(
};
return [errorEvent];
}
default:
default: {
// Handle activity-snapshot and activity-delta as string comparisons
// since VoltAgentTextStreamPart doesn't include these discriminants yet.
const typeStr = partType as string;
if (typeStr === "activity-snapshot") {
const messageId = (part as { messageId?: string }).messageId ?? fallbackMessageId;
const activityType = (payload as { activityType?: string }).activityType ?? "activity";
const content =
((payload as { content?: Record<string, unknown> }).content as Record<string, unknown>) ??
{};
const replace = (payload as { replace?: boolean }).replace ?? true;
const event: ActivitySnapshotEvent = {
type: ACTIVITY_SNAPSHOT_EVENT_TYPE,
messageId,
activityType,
content,
replace,
};
return [event];
}
if (typeStr === "activity-delta") {
const messageId = (part as { messageId?: string }).messageId ?? fallbackMessageId;
const activityType = (payload as { activityType?: string }).activityType ?? "activity";
const patch = (payload as { patch?: unknown[] }).patch ?? [];
const event: ActivityDeltaEvent = {
type: ACTIVITY_DELTA_EVENT_TYPE,
messageId,
activityType,
patch,
};
return [event];
}
return null;
}
}
}

export function createVoltAgentAGUI(config: VoltAgentAGUIConfig): VoltAgentAGUI {
return new VoltAgentAGUI(config);
}

function tryExtractActivityFromToolResult(
rawResult: unknown,
fallbackMessageId: string,
): (ActivitySnapshotEvent | ActivityDeltaEvent)[] | null {
if (!rawResult || typeof rawResult !== "object") {
return null;
}

// Single activity object
if (!Array.isArray(rawResult)) {
const obj = rawResult as Record<string, unknown>;
if (typeof obj.activityType === "string" && obj.content && typeof obj.content === "object") {
const event: ActivitySnapshotEvent = {
type: ACTIVITY_SNAPSHOT_EVENT_TYPE,
messageId: fallbackMessageId,
activityType: obj.activityType,
content: obj.content as Record<string, unknown>,
replace: typeof obj.replace === "boolean" ? obj.replace : true,
};
return [event];
}
if (typeof obj.activityType === "string" && Array.isArray(obj.patch)) {
const event: ActivityDeltaEvent = {
type: ACTIVITY_DELTA_EVENT_TYPE,
messageId: fallbackMessageId,
activityType: obj.activityType,
patch: obj.patch,
};
return [event];
}
return null;
}

// Array of activity objects
const events: (ActivitySnapshotEvent | ActivityDeltaEvent)[] = [];
for (const item of rawResult) {
if (!item || typeof item !== "object") continue;
const obj = item as Record<string, unknown>;
if (typeof obj.activityType === "string" && obj.content && typeof obj.content === "object") {
events.push({
type: ACTIVITY_SNAPSHOT_EVENT_TYPE,
messageId: fallbackMessageId,
activityType: obj.activityType,
content: obj.content as Record<string, unknown>,
replace: typeof obj.replace === "boolean" ? obj.replace : true,
});
} else if (typeof obj.activityType === "string" && Array.isArray(obj.patch)) {
events.push({
type: ACTIVITY_DELTA_EVENT_TYPE,
messageId: fallbackMessageId,
activityType: obj.activityType,
patch: obj.patch,
});
}
}

return events.length > 0 ? events : null;
}

function generateId(): string {
const cryptoApi = typeof globalThis !== "undefined" ? (globalThis as any).crypto : undefined;
if (cryptoApi && typeof cryptoApi.randomUUID === "function") {
Expand Down