diff --git a/PROTOCOL.md b/PROTOCOL.md index fd3823c..ff7769c 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -38,6 +38,11 @@ Dispatch a user message to a session. | taskId | uuid | | | sessionId | uuid | | | channelId | string | Routes stream events back to the correct browser tab | +| attemptId | uuid? | Active task attempt created by the relay. | +| attemptNumber | int? | Monotonic attempt number for the task. | +| leaseExpiresAt | string? | ISO timestamp for the active relay lease. | +| deadlineProfile | TaskDeadlines? | Daemon supervision deadline profile in milliseconds. | +| turnKind | string? | `user`, `session_title`, `context_stats`, `compact`, or `control`. | | prompt | string | | | engine | "pi"? | Optional task execution engine. Empty means `"pi"`. | | provider | string? | Optional Pi provider id. Empty means `"claude-cli"`. | @@ -69,8 +74,24 @@ Dispatch a user message to a session. | Field | Type | Notes | |---|---|---| | token | string | Bearer token scoped to one task. | +| id | string? | Capability record identifier. | +| attemptId | uuid? | Attempt that owns this capability. | | apiBaseUrl | string | Cloud app base URL for `/api/agent-plan/*`. | | expiresAt | string | ISO timestamp. | +| snapshot | object? | Capability metadata snapshot used by cloud-side authorization. | + +`TaskDeadlines`: + +| Field | Type | Notes | +|---|---|---| +| processStartMs | int? | Process launch deadline. | +| promptWriteMs | int? | Prompt write deadline. | +| firstEventMs | int? | Deadline for the first parsed runtime event. | +| firstVisibleEventMs | int? | Deadline for the first user-visible runtime event. | +| streamIdleMs | int? | Stream inactivity deadline. | +| toolIdleMs | int? | Tool execution inactivity deadline. | +| userInputMs | int? | User input wait deadline. | +| cleanupTermMs | int? | Grace period for process cleanup. | `Task.contextRefs` carries project-relative file and folder references selected in the cloud composer. The relay forwards this field only to daemons that advertise `Hello.capabilities.contextRefs`. @@ -221,6 +242,63 @@ reconnect recovery happens by reloading persisted messages by session and sequence from the database. There is no daemon ↔ relay ack/replay/WAL handshake in protocol version 1. +### Task attempt lifecycle + +The relay owns task attempts. A dispatched `task` includes the active +`attemptId`, `attemptNumber`, `leaseExpiresAt`, `deadlineProfile`, and +`turnKind`. Daemons echo attempt metadata on task-adjacent frames so the relay +can associate runtime events with the active attempt. + +`taskLifecycle` is the structured lifecycle frame for attempt diagnostics: + +| Field | Type | Notes | +|---|---|---| +| type | "taskLifecycle" | | +| taskId | uuid | | +| attemptId | uuid | | +| attemptNumber | int | | +| sessionId | uuid | | +| channelId | string | | +| phase | string | Lifecycle phase. | +| status | string | Durable attempt status. | +| retryable | boolean? | Terminal retry-safety hint. | +| failureCode | string? | Stable terminal failure code. | +| message | string? | Operator-facing detail. | +| userMessage | string? | User-facing failure detail. | +| observedAt | string | RFC3339 timestamp. | +| deadlineAt | string? | Deadline associated with the phase. | +| pid | int? | Local process id when known. | +| provider | string? | Pi provider id. | +| model | string? | Provider model id. | +| requestId | uuid? | Root correlation id. | +| traceparent | string? | W3C trace context. | + +Lifecycle phases are `accepted`, `queued`, `started`, `pi_started`, +`prompt_written`, `first_event_seen`, `first_visible_event_seen`, `streaming`, +`tool_started`, `tool_finished`, `waiting_input`, `input_received`, +`cleanup_started`, `cleanup_finished`, `heartbeat`, `retry_scheduled`, +`completed`, `failed`, `canceled`, `timed_out`, and `lost`. + +Attempt statuses are `created`, `queued`, `started`, `pi_started`, +`prompt_written`, `first_event_seen`, `first_visible_event_seen`, `streaming`, +`waiting_input`, `tool_running`, `cleanup_started`, `cleanup_finished`, +`completed`, `failed`, `canceled`, `timed_out`, and `lost`. + +The relay filters stale lifecycle frames by the active tuple +`(taskId, attemptId, sessionId, machineId)`. Frames that do not match the active +attempt are ignored for aggregate task state and may still be logged for +diagnostics. Terminal lifecycle phases map to task aggregate states: +`completed`, `failed`, `canceled`, `timed_out`, and `lost`. + +Retry safety is attempt-local. A timeout before visible output or side effects +can be marked `retryable`; phases after visible output or tool execution make +automatic retry unsafe unless a higher-level policy explicitly allows it. + +Control turns use `turnKind` to distinguish user-visible work from local +maintenance such as session title generation, context stats, compaction, and +daemon control flows. Consumers that do not understand attempt fields ignore +them as additive JSON fields. + ### `planningEvent` Append-only planning journal event sent from a source runtime to the relay. diff --git a/envelope.go b/envelope.go index cae84c6..a84ead3 100644 --- a/envelope.go +++ b/envelope.go @@ -13,6 +13,26 @@ type Envelope struct { Payload any } +func (e Envelope) DecodePayload() (any, error) { + payload, err := payloadForType(e.Type) + if err != nil { + return nil, err + } + switch raw := e.Payload.(type) { + case json.RawMessage: + if err := json.Unmarshal(raw, payload); err != nil { + return nil, fmt.Errorf("decode %s: %w", e.Type, err) + } + case []byte: + if err := json.Unmarshal(raw, payload); err != nil { + return nil, fmt.Errorf("decode %s: %w", e.Type, err) + } + default: + return e.Payload, nil + } + return payload, nil +} + // ParseEnvelope reads raw JSON, looks at the type field, and unmarshals // into the correct concrete struct. func ParseEnvelope(data []byte) (*Envelope, error) { @@ -63,6 +83,8 @@ func payloadForType(msgType string) (any, error) { switch msgType { case MsgTypeTask: return &Task{}, nil + case MsgTypeTaskLifecycle: + return &TaskLifecycle{}, nil case MsgTypeStop: return &Stop{}, nil case MsgTypePermissionResponse: diff --git a/messages.go b/messages.go index 145d2b8..b3841d8 100644 --- a/messages.go +++ b/messages.go @@ -13,6 +13,7 @@ type MessageType = string // Message type constants. const ( MsgTypeTask = "task" + MsgTypeTaskLifecycle = "taskLifecycle" MsgTypeStop = "stop" MsgTypePermissionResponse = "permissionResponse" MsgTypeQuestionResponse = "questionResponse" @@ -128,9 +129,12 @@ type ContextRef struct { } type PlanCapability struct { - Token string `json:"token"` - APIBaseURL string `json:"apiBaseUrl"` - ExpiresAt string `json:"expiresAt"` + ID string `json:"id,omitempty"` + AttemptID string `json:"attemptId,omitempty"` + Token string `json:"token"` + APIBaseURL string `json:"apiBaseUrl"` + ExpiresAt string `json:"expiresAt"` + Snapshot json.RawMessage `json:"snapshot,omitempty"` } type PlanningEvent struct { @@ -171,12 +175,86 @@ type PlanningEventAck struct { Error string `json:"error,omitempty"` } +type TurnKind string + +const ( + TurnKindUser TurnKind = "user" + TurnKindSessionTitle TurnKind = "session_title" + TurnKindContextStats TurnKind = "context_stats" + TurnKindCompact TurnKind = "compact" + TurnKindControl TurnKind = "control" +) + +type TaskDeadlines struct { + ProcessStartMs int `json:"processStartMs,omitempty"` + PromptWriteMs int `json:"promptWriteMs,omitempty"` + FirstEventMs int `json:"firstEventMs,omitempty"` + FirstVisibleEventMs int `json:"firstVisibleEventMs,omitempty"` + StreamIdleMs int `json:"streamIdleMs,omitempty"` + ToolIdleMs int `json:"toolIdleMs,omitempty"` + UserInputMs int `json:"userInputMs,omitempty"` + CleanupTermMs int `json:"cleanupTermMs,omitempty"` +} + +type TaskLifecyclePhase string + +const ( + TaskLifecyclePhaseAccepted TaskLifecyclePhase = "accepted" + TaskLifecyclePhaseQueued TaskLifecyclePhase = "queued" + TaskLifecyclePhaseStarted TaskLifecyclePhase = "started" + TaskLifecyclePhasePiStarted TaskLifecyclePhase = "pi_started" + TaskLifecyclePhasePromptWritten TaskLifecyclePhase = "prompt_written" + TaskLifecyclePhaseFirstEventSeen TaskLifecyclePhase = "first_event_seen" + TaskLifecyclePhaseFirstVisibleEventSeen TaskLifecyclePhase = "first_visible_event_seen" + TaskLifecyclePhaseStreaming TaskLifecyclePhase = "streaming" + TaskLifecyclePhaseToolStarted TaskLifecyclePhase = "tool_started" + TaskLifecyclePhaseToolFinished TaskLifecyclePhase = "tool_finished" + TaskLifecyclePhaseWaitingInput TaskLifecyclePhase = "waiting_input" + TaskLifecyclePhaseInputReceived TaskLifecyclePhase = "input_received" + TaskLifecyclePhaseCleanupStarted TaskLifecyclePhase = "cleanup_started" + TaskLifecyclePhaseCleanupFinished TaskLifecyclePhase = "cleanup_finished" + TaskLifecyclePhaseHeartbeat TaskLifecyclePhase = "heartbeat" + TaskLifecyclePhaseRetryScheduled TaskLifecyclePhase = "retry_scheduled" + TaskLifecyclePhaseCompleted TaskLifecyclePhase = "completed" + TaskLifecyclePhaseFailed TaskLifecyclePhase = "failed" + TaskLifecyclePhaseCanceled TaskLifecyclePhase = "canceled" + TaskLifecyclePhaseTimedOut TaskLifecyclePhase = "timed_out" + TaskLifecyclePhaseLost TaskLifecyclePhase = "lost" +) + +type TaskAttemptStatus string + +const ( + TaskAttemptStatusCreated TaskAttemptStatus = "created" + TaskAttemptStatusQueued TaskAttemptStatus = "queued" + TaskAttemptStatusStarted TaskAttemptStatus = "started" + TaskAttemptStatusPiStarted TaskAttemptStatus = "pi_started" + TaskAttemptStatusPromptWritten TaskAttemptStatus = "prompt_written" + TaskAttemptStatusFirstEventSeen TaskAttemptStatus = "first_event_seen" + TaskAttemptStatusFirstVisibleEventSeen TaskAttemptStatus = "first_visible_event_seen" + TaskAttemptStatusStreaming TaskAttemptStatus = "streaming" + TaskAttemptStatusWaitingInput TaskAttemptStatus = "waiting_input" + TaskAttemptStatusToolRunning TaskAttemptStatus = "tool_running" + TaskAttemptStatusCleanupStarted TaskAttemptStatus = "cleanup_started" + TaskAttemptStatusCleanupFinished TaskAttemptStatus = "cleanup_finished" + TaskAttemptStatusCompleted TaskAttemptStatus = "completed" + TaskAttemptStatusFailed TaskAttemptStatus = "failed" + TaskAttemptStatusCanceled TaskAttemptStatus = "canceled" + TaskAttemptStatusTimedOut TaskAttemptStatus = "timed_out" + TaskAttemptStatusLost TaskAttemptStatus = "lost" +) + // Task is sent from the browser to the daemon to dispatch a user message. type Task struct { Type string `json:"type"` TaskID string `json:"taskId"` SessionID string `json:"sessionId"` ChannelID string `json:"channelId"` + AttemptID string `json:"attemptId,omitempty"` + AttemptNumber int `json:"attemptNumber,omitempty"` + LeaseExpiresAt string `json:"leaseExpiresAt,omitempty"` + DeadlineProfile TaskDeadlines `json:"deadlineProfile,omitempty"` + TurnKind TurnKind `json:"turnKind,omitempty"` Prompt string `json:"prompt"` Engine string `json:"engine,omitempty"` // "pi"; empty defaults to pi Provider string `json:"provider,omitempty"` // Pi provider; empty defaults to claude-cli @@ -194,6 +272,28 @@ type Task struct { PlanCapability *PlanCapability `json:"planCapability,omitempty"` } +type TaskLifecycle struct { + Type MessageType `json:"type"` + TaskID string `json:"taskId"` + AttemptID string `json:"attemptId"` + AttemptNumber int `json:"attemptNumber"` + SessionID string `json:"sessionId"` + ChannelID string `json:"channelId"` + Phase TaskLifecyclePhase `json:"phase"` + Status TaskAttemptStatus `json:"status"` + Retryable bool `json:"retryable,omitempty"` + FailureCode string `json:"failureCode,omitempty"` + Message string `json:"message,omitempty"` + UserMessage string `json:"userMessage,omitempty"` + ObservedAt time.Time `json:"observedAt"` + DeadlineAt *time.Time `json:"deadlineAt,omitempty"` + PID int `json:"pid,omitempty"` + Provider string `json:"provider,omitempty"` + Model string `json:"model,omitempty"` + RequestID string `json:"requestId,omitempty"` + Traceparent string `json:"traceparent,omitempty"` +} + // Stop asks the daemon to interrupt the current Claude process for a session. type Stop struct { Type string `json:"type"` @@ -274,6 +374,9 @@ type ContextStatsRequest struct { // Stream carries a single Claude event plus a sequence number. type Stream struct { Type string `json:"type"` + TaskID string `json:"taskId,omitempty"` + AttemptID string `json:"attemptId,omitempty"` + AttemptNumber int `json:"attemptNumber,omitempty"` SessionID string `json:"sessionId"` ChannelID string `json:"channelId"` SequenceNumber int64 `json:"sequenceNumber"` @@ -284,19 +387,23 @@ type Stream struct { // TaskStarted signals the daemon began processing a task. type TaskStarted struct { - Type string `json:"type"` - TaskID string `json:"taskId"` - SessionID string `json:"sessionId"` - ChannelID string `json:"channelId"` - StartedAt string `json:"startedAt"` - RequestID string `json:"requestId,omitempty"` - Traceparent string `json:"traceparent,omitempty"` // W3C trace context + Type string `json:"type"` + TaskID string `json:"taskId"` + AttemptID string `json:"attemptId,omitempty"` + AttemptNumber int `json:"attemptNumber,omitempty"` + SessionID string `json:"sessionId"` + ChannelID string `json:"channelId"` + StartedAt string `json:"startedAt"` + RequestID string `json:"requestId,omitempty"` + Traceparent string `json:"traceparent,omitempty"` // W3C trace context } // TaskComplete reports final result metadata. type TaskComplete struct { Type string `json:"type"` TaskID string `json:"taskId"` + AttemptID string `json:"attemptId,omitempty"` + AttemptNumber int `json:"attemptNumber,omitempty"` SessionID string `json:"sessionId"` ChannelID string `json:"channelId"` ClaudeSessionID string `json:"claudeSessionId"` @@ -310,33 +417,46 @@ type TaskComplete struct { // TaskError reports a failure. type TaskError struct { - Type string `json:"type"` - TaskID string `json:"taskId"` - SessionID string `json:"sessionId"` - ChannelID string `json:"channelId"` - Error string `json:"error"` - RequestID string `json:"requestId,omitempty"` - Traceparent string `json:"traceparent,omitempty"` // W3C trace context + Type string `json:"type"` + TaskID string `json:"taskId"` + AttemptID string `json:"attemptId,omitempty"` + AttemptNumber int `json:"attemptNumber,omitempty"` + SessionID string `json:"sessionId"` + ChannelID string `json:"channelId"` + Error string `json:"error"` + FailureCode string `json:"failureCode,omitempty"` + Retryable bool `json:"retryable,omitempty"` + UserMessage string `json:"userMessage,omitempty"` + RequestID string `json:"requestId,omitempty"` + Traceparent string `json:"traceparent,omitempty"` // W3C trace context } // TaskCancelled tells the relay/browser that a task was interrupted by the user. type TaskCancelled struct { - Type string `json:"type"` - TaskID string `json:"taskId"` - SessionID string `json:"sessionId"` - ChannelID string `json:"channelId"` - RequestID string `json:"requestId,omitempty"` - Traceparent string `json:"traceparent,omitempty"` // W3C trace context + Type string `json:"type"` + TaskID string `json:"taskId"` + AttemptID string `json:"attemptId,omitempty"` + AttemptNumber int `json:"attemptNumber,omitempty"` + SessionID string `json:"sessionId"` + ChannelID string `json:"channelId"` + FailureCode string `json:"failureCode,omitempty"` + Retryable bool `json:"retryable,omitempty"` + UserMessage string `json:"userMessage,omitempty"` + RequestID string `json:"requestId,omitempty"` + Traceparent string `json:"traceparent,omitempty"` // W3C trace context } // PermissionRequest is Claude asking for tool approval. type PermissionRequest struct { - Type string `json:"type"` - SessionID string `json:"sessionId"` - ChannelID string `json:"channelId"` - RequestID string `json:"requestId"` - ToolName string `json:"toolName"` - ToolInput json.RawMessage `json:"toolInput"` + Type string `json:"type"` + TaskID string `json:"taskId,omitempty"` + AttemptID string `json:"attemptId,omitempty"` + AttemptNumber int `json:"attemptNumber,omitempty"` + SessionID string `json:"sessionId"` + ChannelID string `json:"channelId"` + RequestID string `json:"requestId"` + ToolName string `json:"toolName"` + ToolInput json.RawMessage `json:"toolInput"` } // QuestionOption is a structured answer choice for AskUserQuestion. @@ -348,14 +468,17 @@ type QuestionOption struct { // Question is Claude asking the user for input. type Question struct { - Type string `json:"type"` - SessionID string `json:"sessionId"` - ChannelID string `json:"channelId"` - RequestID string `json:"requestId"` - Question string `json:"question"` - Header string `json:"header,omitempty"` - MultiSelect bool `json:"multiSelect,omitempty"` - Options []QuestionOption `json:"options,omitempty"` + Type string `json:"type"` + TaskID string `json:"taskId,omitempty"` + AttemptID string `json:"attemptId,omitempty"` + AttemptNumber int `json:"attemptNumber,omitempty"` + SessionID string `json:"sessionId"` + ChannelID string `json:"channelId"` + RequestID string `json:"requestId"` + Question string `json:"question"` + Header string `json:"header,omitempty"` + MultiSelect bool `json:"multiSelect,omitempty"` + Options []QuestionOption `json:"options,omitempty"` } type ContextStats struct { diff --git a/messages_test.go b/messages_test.go index 9261b7d..fffcf02 100644 --- a/messages_test.go +++ b/messages_test.go @@ -1834,3 +1834,93 @@ func TestMachineStatusRoundTrip(t *testing.T) { t.Fatalf("unexpected payload: %+v", got) } } + +func TestTaskLifecycleRoundTrip(t *testing.T) { + observedAt := time.Date(2026, 4, 30, 12, 30, 55, 0, time.UTC) + deadlineAt := observedAt.Add(90 * time.Second) + msg := &TaskLifecycle{ + Type: MsgTypeTaskLifecycle, + TaskID: "task-1", + AttemptID: "attempt-1", + AttemptNumber: 1, + SessionID: "session-1", + ChannelID: "channel-1", + Phase: TaskLifecyclePhasePromptWritten, + Status: TaskAttemptStatusPromptWritten, + ObservedAt: observedAt, + DeadlineAt: &deadlineAt, + PID: 12345, + Provider: "claude-cli", + Model: "claude-sonnet-4-6", + RequestID: "request-1", + Traceparent: "00-b1a91ab1f89d5141c7280de8bd272d73-1111111111111111-01", + } + + raw, err := json.Marshal(msg) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + env := Envelope{Type: MsgTypeTaskLifecycle, Payload: raw} + decoded, err := env.DecodePayload() + if err != nil { + t.Fatalf("decode: %v", err) + } + got, ok := decoded.(*TaskLifecycle) + if !ok { + t.Fatalf("decoded type = %T", decoded) + } + if got.AttemptID != "attempt-1" || got.Phase != TaskLifecyclePhasePromptWritten { + t.Fatalf("lifecycle fields missing: %#v", got) + } +} + +func TestTaskAttemptFieldsRoundTrip(t *testing.T) { + task := Task{ + Type: MsgTypeTask, + TaskID: "task-1", + SessionID: "session-1", + ChannelID: "channel-1", + Prompt: "hello", + Model: "claude-sonnet-4-6", + Effort: "max", + PermissionMode: "acceptEdits", + CWD: "/tmp/project", + AttemptID: "attempt-1", + AttemptNumber: 2, + LeaseExpiresAt: "2026-04-30T12:32:55Z", + TurnKind: TurnKindUser, + DeadlineProfile: TaskDeadlines{ + ProcessStartMs: 15000, + PromptWriteMs: 5000, + FirstEventMs: 90000, + FirstVisibleEventMs: 120000, + StreamIdleMs: 120000, + ToolIdleMs: 300000, + UserInputMs: 600000, + CleanupTermMs: 2000, + }, + PlanCapability: &PlanCapability{ + ID: "cap-1", + AttemptID: "attempt-1", + Token: "opaque", + APIBaseURL: "https://app.gsd.build", + ExpiresAt: "2026-04-30T12:45:55Z", + }, + } + + raw, err := json.Marshal(task) + if err != nil { + t.Fatalf("marshal: %v", err) + } + var got Task + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if got.AttemptID != "attempt-1" || got.DeadlineProfile.FirstEventMs != 90000 { + t.Fatalf("attempt/deadline fields missing: %#v", got) + } + if got.PlanCapability == nil || got.PlanCapability.AttemptID != "attempt-1" { + t.Fatalf("plan capability attempt missing: %#v", got.PlanCapability) + } +}