Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 290 additions & 0 deletions docs/reflection-v2-protocol.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
# Genkit Reflection Protocol V2 (WebSocket)

This document outlines the design for the V2 Reflection API, which uses WebSockets for bidirectional communication between the Genkit CLI (Runtime Manager) and Genkit Runtimes (User Applications).

## Overview

In V2, the connection direction is reversed compared to V1:
- **Server**: The Genkit CLI (`RuntimeManagerV2`) starts a WebSocket server.
- **Client**: The Genkit Runtime connects to the CLI's WebSocket server.

This architecture allows the CLI to easily manage multiple runtimes (e.g., for multi-service projects) and eliminates the need for runtimes to manage their own HTTP servers and ports for reflection.

## Transport

- **Protocol**: WebSocket
- **Data Format**: JSON
- **Message Structure**: JSON-RPC 2.0 (modified for streaming)

## Message Format

All messages follow the JSON-RPC 2.0 specification.

### Request
```json
{
"jsonrpc": "2.0",
"method": "methodName",
"params": { ... },
"id": 1
}
```
*Note: The `id` is generated by the sender (Manager). It can be a number (auto-incrementing) or a string (UUID). It must be unique for the pending request within the WebSocket session.*

### Response (Success)
```json
{
"jsonrpc": "2.0",
"result": { ... },
"id": 1
}
```

### Response (Error)
```json
{
"jsonrpc": "2.0",
"error": {
"code": -32000,
"message": "Error message",
"data": { ... } // Optional details (stack trace, etc.)
},
"id": 1
}
```

### Notification
A request without an `id`.
```json
{
"jsonrpc": "2.0",
"method": "methodName",
"params": { ... }
}
```

## Streaming Extension

JSON-RPC 2.0 does not natively support streaming. We extend it by using Notifications from the Runtime to the Manager associated with a specific Request ID.

### Stream Chunk Notification
Sent by the Runtime during a streaming `runAction` request.

```json
{
"jsonrpc": "2.0",
"method": "streamChunk",
"params": {
"requestId": 1, // Matches the ID of the runAction request
"chunk": { ... } // The chunk data
}
}
```

### Run Action State Notification
Sent by the Runtime to provide status updates or metadata (like trace ID) while the action is running, before the result is ready.

```json
{
"jsonrpc": "2.0",
"method": "runActionState",
"params": {
"requestId": 1, // Matches the ID of the runAction request
"state": {
"traceId": "..."
}
}
}
```

## Protocol Flow

### 1. Registration (Runtime -> Manager)

Upon connection, the Runtime must register itself.

**Request (Runtime -> Manager):**
- **Method**: `register`
- **Params**:
```typescript
interface RegisterParams {
id: string; // Unique Runtime ID
pid: number; // Process ID
name?: string; // App name
genkitVersion: string; // e.g., "0.9.0"
reflectionApiSpecVersion: number;
envs?: string[]; // Configured environments
}
```

**Response (Manager -> Runtime):**
- **Result**: `void` (null)

### 2. Configuration (Manager -> Runtime)

The Manager may push configuration updates to the Runtime, such as the Telemetry Server URL.

**Notification (Manager -> Runtime):**
- **Method**: `configure`
- **Params**:
```typescript
interface ConfigureParams {
telemetryServerUrl?: string;
}
```

### 3. List Actions (Manager -> Runtime)

The Manager requests the list of available actions/flows.

**Request (Manager -> Runtime):**
- **Method**: `listActions`
- **Params**: `void` (empty object or null)

**Response (Runtime -> Manager):**
- **Result**: `Record<string, Action>` (Same schema as V1 `/api/actions`)

### 4. Run Action (Manager -> Runtime)

The Manager requests the execution of an action.

**Request (Manager -> Runtime):**
- **Method**: `runAction`
- **Params**:
```typescript
interface RunActionParams {
key: string; // Action key (e.g., "flowName")
input: any; // Input payload
context?: any; // Context data
telemetryLabels?: Record<string, string>;
stream?: boolean; // Whether to stream results
}
```

**Scenario A: Non-Streaming Response**

1. **Notification (Runtime -> Manager)**: `runActionState` (optional, repeated)
- Used to send early trace info or status updates.
- `params.requestId`: Matches request ID.
- `params.state`: The state update (e.g., traceId).

2. **Response (Runtime -> Manager)**:
- **Result**:
```typescript
interface RunActionResult {
result: any; // The return value
telemetry?: {
traceId?: string;
};
}
```

**Scenario B: Streaming Response**

1. **Notification (Runtime -> Manager)**: `runActionState` (optional, repeated)
- Used to send early trace info or status updates.

2. **Notification (Runtime -> Manager)**: `streamChunk` (repeated)
- `params.requestId`: Matches request ID.
- `params.chunk`: The partial result.

3. **Response (Runtime -> Manager)**: Final result.
- **Result**: Same as Non-Streaming (`RunActionResult`). Signals the end of the stream.

### 5. Health Checks

The WebSocket connection state itself serves as a basic health check.
- **Heartbeats**: Standard WebSocket Ping/Pong frames should be used to maintain the connection and detect timeouts.

## Compatibility

- **V1**: HTTP Server on Runtime, Polling/Request from CLI.
- **V2**: WebSocket Server on CLI, Persistent Connection from Runtime.

The CLI will determine which mode to use based on the `--experimental-reflection-v2` flag.

## Example: Streaming Flow Execution

Below is an example sequence of messages for running a flow named `myFlow` with streaming enabled.

**1. Manager Requests Execution**
```json
// Request (Manager -> Runtime)
{
"jsonrpc": "2.0",
"method": "runAction",
"params": {
"key": "/flow/myFlow",
"input": "Describe a cat",
"stream": true
},
"id": 100
}
```

**2. Runtime Sends Early Trace ID**
```json
// Notification (Runtime -> Manager)
{
"jsonrpc": "2.0",
"method": "runActionState",
"params": {
"requestId": 100,
"state": {
"traceId": "abc-123-trace-id"
}
}
}
```

**3. Runtime Sends Stream Chunks**
```json
// Notification (Runtime -> Manager)
{
"jsonrpc": "2.0",
"method": "streamChunk",
"params": {
"requestId": 100,
"chunk": { "content": [{ "text": "A cat is "}] }
}
}
```

```json
// Notification (Runtime -> Manager)
{
"jsonrpc": "2.0",
"method": "streamChunk",
"params": {
"requestId": 100,
"chunk": { "content": [{ "text": "a small "}] }
}
}
```

```json
// Notification (Runtime -> Manager)
{
"jsonrpc": "2.0",
"method": "streamChunk",
"params": {
"requestId": 100,
"chunk": { "content": [{ "text": "feline."}] }
}
}
```

**4. Runtime Sends Final Result**
```json
// Response (Runtime -> Manager)
{
"jsonrpc": "2.0",
"result": {
"result": "A cat is a small feline.",
"telemetry": {
"traceId": "abc-123-trace-id"
}
},
"id": 100
}
```
23 changes: 21 additions & 2 deletions genkit-tools/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ interface RunOptions {
noui?: boolean;
port?: string;
open?: boolean;
experimentalReflectionV2?: boolean;
allowedTelemetryCorsHostnames?: string[];
}

/** Command to run code in dev mode and/or the Dev UI. */
Expand All @@ -34,6 +36,15 @@ export const start = new Command('start')
.option('-n, --noui', 'do not start the Dev UI', false)
.option('-p, --port <port>', 'port for the Dev UI')
.option('-o, --open', 'Open the browser on UI start up')
.option(
'--experimental-reflection-v2',
'start the experimental reflection server (WebSocket)'
)
.option(
'--allowed-telemetry-cors-hostnames <hostnames>',
'comma separated list of allowed telemetry CORS hostnames',
(value) => value.split(',')
)
.action(async (options: RunOptions) => {
const projectRoot = await findProjectRoot();
if (projectRoot.includes('/.Trash/')) {
Expand All @@ -49,14 +60,22 @@ export const start = new Command('start')
const result = await startDevProcessManager(
projectRoot,
start.args[0],
start.args.slice(1)
start.args.slice(1),
options.experimentalReflectionV2,
options.allowedTelemetryCorsHostnames
);
manager = result.manager;
processPromise = result.processPromise;
} else {
manager = await startManager(projectRoot, true);
manager = await startManager(
projectRoot,
true,
options.experimentalReflectionV2,
options.allowedTelemetryCorsHostnames
);
processPromise = new Promise(() => {});
}

if (!options.noui) {
let port: number;
if (options.port) {
Expand Down
Loading
Loading