diff --git a/AGENTS.md b/AGENTS.md index 42de6c3..130d5a0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -9,6 +9,7 @@ This repository is open-source and must remain provider- and organization-agnost - Never add TextCortex-specific values to code, tests, docs, examples, or comments. - Never include organization-specific domains, project IDs, cluster names, account IDs, emails, tokens, or credentials. - Never use TextCortex hostnames in test fixtures. +- New or changed data structures, schemas, DTOs, CRDs, and API models must use `type` as the field name, never `kind`. If an external payload already sends `kind`, convert it at the boundary with an alias and keep the internal field named `type`. Use neutral placeholders instead: diff --git a/api/channel_conversation_service.go b/api/channel_conversation_service.go index cc9914f..0244899 100644 --- a/api/channel_conversation_service.go +++ b/api/channel_conversation_service.go @@ -3,6 +3,7 @@ package main import ( "crypto/sha256" "encoding/hex" + "encoding/json" "fmt" "net/http" "strings" @@ -14,18 +15,21 @@ import ( ) const ( - channelConversationRouteLabelKey = "spritz.sh/channel-route" - channelConversationPrincipalAnnotationKey = "spritz.sh/channel-principal-id" - channelConversationProviderAnnotationKey = "spritz.sh/channel-provider" - channelConversationExternalScopeTypeAnnotationKey = "spritz.sh/channel-external-scope-type" - channelConversationExternalTenantIDAnnotationKey = "spritz.sh/channel-external-tenant-id" - channelConversationExternalChannelIDAnnotationKey = "spritz.sh/channel-external-channel-id" - channelConversationExternalConversationIDAnnotationKey = "spritz.sh/channel-external-conversation-id" + channelConversationRouteLabelKey = "spritz.sh/channel-route" + channelConversationPrincipalAnnotationKey = "spritz.sh/channel-principal-id" + channelConversationProviderAnnotationKey = "spritz.sh/channel-provider" + channelConversationExternalScopeTypeAnnotationKey = "spritz.sh/channel-external-scope-type" + channelConversationExternalTenantIDAnnotationKey = "spritz.sh/channel-external-tenant-id" + channelConversationExternalChannelIDAnnotationKey = "spritz.sh/channel-external-channel-id" + channelConversationExternalConversationIDAnnotationKey = "spritz.sh/channel-external-conversation-id" + channelConversationExternalConversationAliasesAnnotationKey = "spritz.sh/channel-external-conversation-aliases" + channelConversationBaseRouteLabelKey = "spritz.sh/channel-route-base" ) type channelConversationUpsertRequest struct { RequestID string `json:"requestId,omitempty"` Namespace string `json:"namespace,omitempty"` + ConversationID string `json:"conversationId,omitempty"` PrincipalID string `json:"principalId"` InstanceID string `json:"instanceId"` OwnerID string `json:"ownerId"` @@ -52,6 +56,7 @@ type normalizedChannelConversationIdentity struct { func normalizeChannelConversationUpsertRequest(body channelConversationUpsertRequest) (channelConversationUpsertRequest, normalizedChannelConversationIdentity, error) { body.RequestID = strings.TrimSpace(body.RequestID) body.Namespace = strings.TrimSpace(body.Namespace) + body.ConversationID = strings.TrimSpace(body.ConversationID) body.PrincipalID = strings.TrimSpace(body.PrincipalID) body.InstanceID = sanitizeSpritzNameToken(body.InstanceID) body.OwnerID = strings.TrimSpace(body.OwnerID) @@ -113,6 +118,19 @@ func channelConversationRouteHash(identity normalizedChannelConversationIdentity return hex.EncodeToString(sum[:16]) } +func channelConversationBaseRouteHash(identity normalizedChannelConversationIdentity, ownerID, instanceID string) string { + sum := sha256.Sum256([]byte(strings.Join([]string{ + identity.principalID, + identity.provider, + identity.externalScopeType, + identity.externalTenantID, + identity.externalChannelID, + strings.TrimSpace(ownerID), + strings.TrimSpace(instanceID), + }, "\n"))) + return hex.EncodeToString(sum[:16]) +} + func channelConversationName(spritzName, ownerID string, identity normalizedChannelConversationIdentity) string { prefix := strings.ToLower(strings.TrimSpace(spritzName)) prefix = strings.Trim(prefix, "-") @@ -137,7 +155,7 @@ func channelConversationName(spritzName, ownerID string, identity normalizedChan return fmt.Sprintf("%s-%s", prefix, suffix) } -func channelConversationMatchesIdentity(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity) bool { +func channelConversationMatchesBaseIdentity(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity) bool { if conversation == nil { return false } @@ -145,8 +163,102 @@ func channelConversationMatchesIdentity(conversation *spritzv1.SpritzConversatio strings.TrimSpace(conversation.Annotations[channelConversationProviderAnnotationKey]) == identity.provider && strings.TrimSpace(conversation.Annotations[channelConversationExternalScopeTypeAnnotationKey]) == identity.externalScopeType && strings.TrimSpace(conversation.Annotations[channelConversationExternalTenantIDAnnotationKey]) == identity.externalTenantID && - strings.TrimSpace(conversation.Annotations[channelConversationExternalChannelIDAnnotationKey]) == identity.externalChannelID && - strings.TrimSpace(conversation.Annotations[channelConversationExternalConversationIDAnnotationKey]) == identity.externalConversationID + strings.TrimSpace(conversation.Annotations[channelConversationExternalChannelIDAnnotationKey]) == identity.externalChannelID +} + +func channelConversationExternalConversationAliases(conversation *spritzv1.SpritzConversation) []string { + if conversation == nil { + return nil + } + raw := strings.TrimSpace(conversation.Annotations[channelConversationExternalConversationAliasesAnnotationKey]) + if raw == "" { + return nil + } + var payload []string + if err := json.Unmarshal([]byte(raw), &payload); err != nil { + return nil + } + primary := strings.TrimSpace(conversation.Annotations[channelConversationExternalConversationIDAnnotationKey]) + aliases := make([]string, 0, len(payload)) + seen := map[string]struct{}{} + for _, candidate := range payload { + candidate = strings.TrimSpace(candidate) + if candidate == "" || candidate == primary { + continue + } + if _, ok := seen[candidate]; ok { + continue + } + seen[candidate] = struct{}{} + aliases = append(aliases, candidate) + } + return aliases +} + +func channelConversationHasExternalConversationID(conversation *spritzv1.SpritzConversation, externalConversationID string) bool { + externalConversationID = strings.TrimSpace(externalConversationID) + if externalConversationID == "" || conversation == nil { + return false + } + if strings.TrimSpace(conversation.Annotations[channelConversationExternalConversationIDAnnotationKey]) == externalConversationID { + return true + } + for _, alias := range channelConversationExternalConversationAliases(conversation) { + if alias == externalConversationID { + return true + } + } + return false +} + +func channelConversationMatchesIdentity(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity) bool { + return channelConversationMatchesBaseIdentity(conversation, identity) && + channelConversationHasExternalConversationID(conversation, identity.externalConversationID) +} + +func channelConversationBelongsToSpritz(conversation *spritzv1.SpritzConversation, spritz *spritzv1.Spritz) bool { + if conversation == nil || spritz == nil { + return false + } + return strings.TrimSpace(conversation.Spec.SpritzName) == spritz.Name && + strings.TrimSpace(conversation.Spec.Owner.ID) == spritz.Spec.Owner.ID && + strings.TrimSpace(conversation.Labels[acpConversationSpritzLabelKey]) == spritz.Name && + strings.TrimSpace(conversation.Labels[acpConversationOwnerLabelKey]) == ownerLabelValue(spritz.Spec.Owner.ID) +} + +func appendChannelConversationAlias(conversation *spritzv1.SpritzConversation, externalConversationID string) (bool, error) { + externalConversationID = strings.TrimSpace(externalConversationID) + if externalConversationID == "" || conversation == nil { + return false, nil + } + if conversation.Annotations == nil { + conversation.Annotations = map[string]string{} + } + if channelConversationHasExternalConversationID(conversation, externalConversationID) { + return false, nil + } + aliases := append(channelConversationExternalConversationAliases(conversation), externalConversationID) + payload, err := json.Marshal(aliases) + if err != nil { + return false, err + } + conversation.Annotations[channelConversationExternalConversationAliasesAnnotationKey] = string(payload) + return true, nil +} + +func ensureChannelConversationBaseRouteLabel(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity, spritz *spritzv1.Spritz) bool { + if conversation == nil || spritz == nil { + return false + } + if conversation.Labels == nil { + conversation.Labels = map[string]string{} + } + expected := channelConversationBaseRouteHash(identity, spritz.Spec.Owner.ID, spritz.Name) + if strings.TrimSpace(conversation.Labels[channelConversationBaseRouteLabelKey]) == expected { + return false + } + conversation.Labels[channelConversationBaseRouteLabelKey] = expected + return true } func (s *server) getAdminScopedACPReadySpritz(c echo.Context, namespace, instanceID, ownerID string) (*spritzv1.Spritz, error) { @@ -164,10 +276,10 @@ func (s *server) getAdminScopedACPReadySpritz(c echo.Context, namespace, instanc } func (s *server) findChannelConversation(c echo.Context, namespace string, spritz *spritzv1.Spritz, identity normalizedChannelConversationIdentity) (*spritzv1.SpritzConversation, bool, error) { - list := &spritzv1.SpritzConversationList{} + exactList := &spritzv1.SpritzConversationList{} if err := s.client.List( c.Request().Context(), - list, + exactList, client.InNamespace(namespace), client.MatchingLabels{ acpConversationLabelKey: acpConversationLabelValue, @@ -179,8 +291,8 @@ func (s *server) findChannelConversation(c echo.Context, namespace string, sprit return nil, false, err } var match *spritzv1.SpritzConversation - for i := range list.Items { - item := &list.Items[i] + for i := range exactList.Items { + item := &exactList.Items[i] if !channelConversationMatchesIdentity(item, identity) { continue } @@ -189,6 +301,38 @@ func (s *server) findChannelConversation(c echo.Context, namespace string, sprit } match = item.DeepCopy() } + + baseList := &spritzv1.SpritzConversationList{} + if err := s.client.List( + c.Request().Context(), + baseList, + client.InNamespace(namespace), + client.MatchingLabels{ + acpConversationLabelKey: acpConversationLabelValue, + acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID), + acpConversationSpritzLabelKey: spritz.Name, + channelConversationBaseRouteLabelKey: channelConversationBaseRouteHash( + identity, + spritz.Spec.Owner.ID, + spritz.Name, + ), + }, + ); err != nil { + return nil, false, err + } + for i := range baseList.Items { + item := &baseList.Items[i] + if !channelConversationMatchesIdentity(item, identity) { + continue + } + if match != nil && item.Name == match.Name { + continue + } + if match != nil { + return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous") + } + match = item.DeepCopy() + } if match == nil { return nil, false, nil } @@ -203,6 +347,7 @@ func applyChannelConversationMetadata(conversation *spritzv1.SpritzConversation, conversation.Labels[acpConversationSpritzLabelKey] = spritz.Name conversation.Labels[acpConversationLabelKey] = acpConversationLabelValue conversation.Labels[channelConversationRouteLabelKey] = channelConversationRouteHash(identity, spritz.Spec.Owner.ID, spritz.Name) + conversation.Labels[channelConversationBaseRouteLabelKey] = channelConversationBaseRouteHash(identity, spritz.Spec.Owner.ID, spritz.Name) if conversation.Annotations == nil { conversation.Annotations = map[string]string{} diff --git a/api/channel_conversations.go b/api/channel_conversations.go index a0d672b..9e2c6f7 100644 --- a/api/channel_conversations.go +++ b/api/channel_conversations.go @@ -53,6 +53,44 @@ func (s *server) upsertChannelConversation(c echo.Context) error { return s.writeACPResourceError(c, err) } } + if normalizedBody.ConversationID != "" { + existing := &spritzv1.SpritzConversation{} + if err := s.client.Get(c.Request().Context(), clientKey(namespace, normalizedBody.ConversationID), existing); err != nil { + return s.writeACPResourceError(c, err) + } + if !channelConversationMatchesBaseIdentity(existing, identity) || !channelConversationBelongsToSpritz(existing, spritz) { + return writeError(c, http.StatusConflict, "channel conversation is ambiguous") + } + conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity) + if err != nil { + if httpErr, ok := err.(*echo.HTTPError); ok { + return writeError(c, httpErr.Code, httpErr.Message.(string)) + } + return writeError(c, http.StatusInternalServerError, err.Error()) + } + if found && conversation.Name != existing.Name { + return writeError(c, http.StatusConflict, "channel conversation is ambiguous") + } + changed := ensureChannelConversationBaseRouteLabel(existing, identity, spritz) + aliasChanged, err := appendChannelConversationAlias(existing, identity.externalConversationID) + if err != nil { + return writeError(c, http.StatusInternalServerError, err.Error()) + } + changed = changed || aliasChanged + if normalizedBody.RequestID != "" { + if existing.Annotations == nil { + existing.Annotations = map[string]string{} + } + existing.Annotations[requestIDAnnotationKey] = normalizedBody.RequestID + changed = true + } + if changed { + if err := s.client.Update(c.Request().Context(), existing); err != nil { + return s.writeACPResourceError(c, err) + } + } + return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": existing}) + } conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity) if err != nil { if httpErr, ok := err.(*echo.HTTPError); ok { diff --git a/api/channel_conversations_test.go b/api/channel_conversations_test.go index 348c613..779a4d0 100644 --- a/api/channel_conversations_test.go +++ b/api/channel_conversations_test.go @@ -2,6 +2,8 @@ package main import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "net/http" "net/http/httptest" @@ -186,6 +188,316 @@ func TestUpsertChannelConversationReusesExistingConversation(t *testing.T) { } } +func TestUpsertChannelConversationPersistsAndResolvesReplyAliases(t *testing.T) { + s := newChannelConversationsTestServer(t, readyACPSpritz("zeno-acme", "owner-123")) + e := echo.New() + s.registerRoutes(e) + + createRec := httptest.NewRecorder() + e.ServeHTTP(createRec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387375.000100", + "title":"Slack concierge" + }`)) + if createRec.Code != http.StatusCreated { + t.Fatalf("expected first request to create, got %d: %s", createRec.Code, createRec.Body.String()) + } + var createPayload struct { + Data struct { + Conversation spritzv1.SpritzConversation `json:"conversation"` + } `json:"data"` + } + if err := json.Unmarshal(createRec.Body.Bytes(), &createPayload); err != nil { + t.Fatalf("failed to decode create response: %v", err) + } + + aliasRec := httptest.NewRecorder() + e.ServeHTTP(aliasRec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "conversationId":"`+createPayload.Data.Conversation.Name+`", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if aliasRec.Code != http.StatusOK { + t.Fatalf("expected alias request to reuse, got %d: %s", aliasRec.Code, aliasRec.Body.String()) + } + + reuseRec := httptest.NewRecorder() + e.ServeHTTP(reuseRec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if reuseRec.Code != http.StatusOK { + t.Fatalf("expected alias lookup to reuse, got %d: %s", reuseRec.Code, reuseRec.Body.String()) + } + + var reusePayload struct { + Data struct { + Created bool `json:"created"` + Conversation spritzv1.SpritzConversation `json:"conversation"` + } `json:"data"` + } + if err := json.Unmarshal(reuseRec.Body.Bytes(), &reusePayload); err != nil { + t.Fatalf("failed to decode reuse response: %v", err) + } + if reusePayload.Data.Created { + t.Fatalf("expected alias lookup to reuse the original conversation") + } + if reusePayload.Data.Conversation.Name != createPayload.Data.Conversation.Name { + t.Fatalf("expected alias lookup to reuse %q, got %q", createPayload.Data.Conversation.Name, reusePayload.Data.Conversation.Name) + } + aliases := channelConversationExternalConversationAliases(&reusePayload.Data.Conversation) + if len(aliases) != 1 || aliases[0] != "1711387376.000100" { + t.Fatalf("expected persisted alias, got %#v", aliases) + } +} + +func legacyChannelConversationRouteHash(identity normalizedChannelConversationIdentity, ownerID, instanceID string) string { + sum := sha256.Sum256([]byte(strings.Join([]string{ + identity.principalID, + identity.provider, + identity.externalScopeType, + identity.externalTenantID, + identity.externalChannelID, + identity.externalConversationID, + strings.TrimSpace(ownerID), + strings.TrimSpace(instanceID), + }, "\n"))) + return hex.EncodeToString(sum[:16]) +} + +func TestUpsertChannelConversationReusesLegacyConversationWithoutBaseRouteLabel(t *testing.T) { + spritz := readyACPSpritz("zeno-acme", "owner-123") + identity := normalizedChannelConversationIdentity{ + principalID: "shared-slack-gateway", + provider: "slack", + externalScopeType: "workspace", + externalTenantID: "T_workspace_1", + externalChannelID: "C_channel_1", + externalConversationID: "1711387375.000100", + } + conversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build conversation: %v", err) + } + conversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, identity) + conversation.Spec.Owner = spritz.Spec.Owner + conversation.Spec.SpritzName = spritz.Name + conversation.Labels = map[string]string{ + acpConversationLabelKey: acpConversationLabelValue, + acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID), + acpConversationSpritzLabelKey: spritz.Name, + channelConversationRouteLabelKey: legacyChannelConversationRouteHash( + identity, + spritz.Spec.Owner.ID, + spritz.Name, + ), + } + conversation.Annotations = map[string]string{ + channelConversationPrincipalAnnotationKey: identity.principalID, + channelConversationProviderAnnotationKey: identity.provider, + channelConversationExternalScopeTypeAnnotationKey: identity.externalScopeType, + channelConversationExternalTenantIDAnnotationKey: identity.externalTenantID, + channelConversationExternalChannelIDAnnotationKey: identity.externalChannelID, + channelConversationExternalConversationIDAnnotationKey: identity.externalConversationID, + requestIDAnnotationKey: "legacy-request", + } + + s := newChannelConversationsTestServer(t, spritz, conversation) + e := echo.New() + s.registerRoutes(e) + + rec := httptest.NewRecorder() + e.ServeHTTP(rec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387375.000100", + "title":"Slack concierge" + }`)) + if rec.Code != http.StatusOK { + t.Fatalf("expected legacy conversation reuse, got %d: %s", rec.Code, rec.Body.String()) + } + var payload struct { + Data struct { + Created bool `json:"created"` + Conversation spritzv1.SpritzConversation `json:"conversation"` + } `json:"data"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode response: %v", err) + } + if payload.Data.Created { + t.Fatalf("expected legacy conversation to be reused") + } + if payload.Data.Conversation.Name != conversation.Name { + t.Fatalf("expected legacy conversation %q, got %q", conversation.Name, payload.Data.Conversation.Name) + } +} + +func TestUpsertChannelConversationRejectsAliasForWrongSpritzConversation(t *testing.T) { + targetSpritz := readyACPSpritz("zeno-acme", "owner-123") + otherSpritz := readyACPSpritz("zeno-other", "owner-999") + identity := normalizedChannelConversationIdentity{ + principalID: "shared-slack-gateway", + provider: "slack", + externalScopeType: "workspace", + externalTenantID: "T_workspace_1", + externalChannelID: "C_channel_1", + externalConversationID: "1711387375.000100", + } + otherConversation, err := buildACPConversationResource(otherSpritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build conversation: %v", err) + } + otherConversation.Name = channelConversationName(otherSpritz.Name, otherSpritz.Spec.Owner.ID, identity) + applyChannelConversationMetadata(otherConversation, identity, "other-request", otherSpritz) + + s := newChannelConversationsTestServer(t, targetSpritz, otherSpritz, otherConversation) + e := echo.New() + s.registerRoutes(e) + + rec := httptest.NewRecorder() + e.ServeHTTP(rec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "conversationId":"`+otherConversation.Name+`", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if rec.Code != http.StatusConflict { + t.Fatalf("expected alias against wrong spritz to conflict, got %d: %s", rec.Code, rec.Body.String()) + } +} + +func TestUpsertChannelConversationRejectsAliasWhenAnotherConversationAlreadyOwnsIt(t *testing.T) { + spritz := readyACPSpritz("zeno-acme", "owner-123") + rootIdentity := normalizedChannelConversationIdentity{ + principalID: "shared-slack-gateway", + provider: "slack", + externalScopeType: "workspace", + externalTenantID: "T_workspace_1", + externalChannelID: "C_channel_1", + externalConversationID: "1711387375.000100", + } + aliasIdentity := rootIdentity + aliasIdentity.externalConversationID = "1711387376.000100" + + rootConversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build root conversation: %v", err) + } + rootConversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, rootIdentity) + applyChannelConversationMetadata(rootConversation, rootIdentity, "root-request", spritz) + + aliasConversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build alias conversation: %v", err) + } + aliasConversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, aliasIdentity) + applyChannelConversationMetadata(aliasConversation, aliasIdentity, "alias-request", spritz) + + s := newChannelConversationsTestServer(t, spritz, rootConversation, aliasConversation) + e := echo.New() + s.registerRoutes(e) + + rec := httptest.NewRecorder() + e.ServeHTTP(rec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "conversationId":"`+rootConversation.Name+`", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if rec.Code != http.StatusConflict { + t.Fatalf("expected alias conflict when another conversation already owns it, got %d: %s", rec.Code, rec.Body.String()) + } +} + +func TestUpsertChannelConversationRejectsWhenExactAndAliasedMatchesConflict(t *testing.T) { + spritz := readyACPSpritz("zeno-acme", "owner-123") + exactIdentity := normalizedChannelConversationIdentity{ + principalID: "shared-slack-gateway", + provider: "slack", + externalScopeType: "workspace", + externalTenantID: "T_workspace_1", + externalChannelID: "C_channel_1", + externalConversationID: "1711387376.000100", + } + aliasedIdentity := exactIdentity + aliasedIdentity.externalConversationID = "1711387375.000100" + + exactConversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build exact conversation: %v", err) + } + exactConversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, exactIdentity) + applyChannelConversationMetadata(exactConversation, exactIdentity, "exact-request", spritz) + + aliasedConversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build aliased conversation: %v", err) + } + aliasedConversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, aliasedIdentity) + applyChannelConversationMetadata(aliasedConversation, aliasedIdentity, "aliased-request", spritz) + if _, err := appendChannelConversationAlias(aliasedConversation, exactIdentity.externalConversationID); err != nil { + t.Fatalf("append alias: %v", err) + } + + s := newChannelConversationsTestServer(t, spritz, exactConversation, aliasedConversation) + e := echo.New() + s.registerRoutes(e) + + rec := httptest.NewRecorder() + e.ServeHTTP(rec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if rec.Code != http.StatusConflict { + t.Fatalf("expected ambiguity conflict when exact and aliased matches disagree, got %d: %s", rec.Code, rec.Body.String()) + } +} + func TestUpsertChannelConversationPreservesExistingTitleAndCWD(t *testing.T) { s := newChannelConversationsTestServer(t, readyACPSpritz("zeno-acme", "owner-123")) e := echo.New() diff --git a/api/channel_routes_test.go b/api/channel_routes_test.go index 6c687e0..a926c69 100644 --- a/api/channel_routes_test.go +++ b/api/channel_routes_test.go @@ -37,10 +37,10 @@ func TestResolveChannelRouteReturnsResolvedInstance(t *testing.T) { s.extensions = extensionRegistry{ resolvers: []configuredResolver{ { - id: "channel-routing", - kind: extensionKindResolver, - operation: extensionOperation("channel.route.resolve"), - match: extensionMatchRule{}, + id: "channel-routing", + extensionType: extensionTypeResolver, + operation: extensionOperation("channel.route.resolve"), + match: extensionMatchRule{}, transport: configuredHTTPTransport{ url: "http://resolver.example.test/channel-route", }, @@ -167,10 +167,10 @@ func TestResolveChannelRouteAllowsAuthDisabledMode(t *testing.T) { extensions: extensionRegistry{ resolvers: []configuredResolver{ { - id: "channel-routing", - kind: extensionKindResolver, - operation: extensionOperationChannelRouteResolve, - match: extensionMatchRule{}, + id: "channel-routing", + extensionType: extensionTypeResolver, + operation: extensionOperationChannelRouteResolve, + match: extensionMatchRule{}, transport: configuredHTTPTransport{ url: "http://resolver.example.test/channel-route", }, @@ -211,10 +211,10 @@ func TestResolveChannelRouteFallsBackToServerNamespace(t *testing.T) { s.extensions = extensionRegistry{ resolvers: []configuredResolver{ { - id: "channel-routing", - kind: extensionKindResolver, - operation: extensionOperationChannelRouteResolve, - match: extensionMatchRule{}, + id: "channel-routing", + extensionType: extensionTypeResolver, + operation: extensionOperationChannelRouteResolve, + match: extensionMatchRule{}, transport: configuredHTTPTransport{ url: "http://resolver.example.test/channel-route", }, @@ -259,10 +259,10 @@ func TestResolveChannelRouteFallsBackToDefaultNamespace(t *testing.T) { s.extensions = extensionRegistry{ resolvers: []configuredResolver{ { - id: "channel-routing", - kind: extensionKindResolver, - operation: extensionOperationChannelRouteResolve, - match: extensionMatchRule{}, + id: "channel-routing", + extensionType: extensionTypeResolver, + operation: extensionOperationChannelRouteResolve, + match: extensionMatchRule{}, transport: configuredHTTPTransport{ url: "http://resolver.example.test/channel-route", }, @@ -308,10 +308,10 @@ func TestResolveChannelRouteRejectsResolverNamespaceMismatch(t *testing.T) { s.extensions = extensionRegistry{ resolvers: []configuredResolver{ { - id: "channel-routing", - kind: extensionKindResolver, - operation: extensionOperationChannelRouteResolve, - match: extensionMatchRule{}, + id: "channel-routing", + extensionType: extensionTypeResolver, + operation: extensionOperationChannelRouteResolve, + match: extensionMatchRule{}, transport: configuredHTTPTransport{ url: "http://resolver.example.test/channel-route", }, diff --git a/api/create_admission_test.go b/api/create_admission_test.go index 747d1e3..2c8d742 100644 --- a/api/create_admission_test.go +++ b/api/create_admission_test.go @@ -42,9 +42,9 @@ func configurePresetResolverTestServer(s *server, resolverURL string) { if strings.TrimSpace(resolverURL) != "" { s.extensions = extensionRegistry{ resolvers: []configuredResolver{{ - id: "runtime-binding", - kind: extensionKindResolver, - operation: extensionOperationPresetCreateResolve, + id: "runtime-binding", + extensionType: extensionTypeResolver, + operation: extensionOperationPresetCreateResolve, match: extensionMatchRule{ presetIDs: map[string]struct{}{"zeno": {}}, }, @@ -266,9 +266,9 @@ func TestCreateSpritzProvisionerPresetResolverReplaysWithResolvedBinding(t *test } s.extensions = extensionRegistry{ resolvers: []configuredResolver{{ - id: "runtime-binding", - kind: extensionKindResolver, - operation: extensionOperationPresetCreateResolve, + id: "runtime-binding", + extensionType: extensionTypeResolver, + operation: extensionOperationPresetCreateResolve, match: extensionMatchRule{ presetIDs: map[string]struct{}{"zeno": {}}, }, @@ -363,9 +363,9 @@ func TestCreateSpritzProvisionerRejectsDisallowedPresetBeforeResolver(t *testing } s.extensions = extensionRegistry{ resolvers: []configuredResolver{{ - id: "runtime-binding", - kind: extensionKindResolver, - operation: extensionOperationPresetCreateResolve, + id: "runtime-binding", + extensionType: extensionTypeResolver, + operation: extensionOperationPresetCreateResolve, match: extensionMatchRule{ presetIDs: map[string]struct{}{"zeno": {}}, }, diff --git a/api/extensions.go b/api/extensions.go index 563a653..4ef468e 100644 --- a/api/extensions.go +++ b/api/extensions.go @@ -20,12 +20,12 @@ const ( extensionsEnvKey = "SPRITZ_EXTENSIONS_JSON" ) -type extensionKind string +type extensionType string const ( - extensionKindResolver extensionKind = "resolver" - extensionKindAuthProvider extensionKind = "auth_provider" - extensionKindLifecycleHook extensionKind = "lifecycle_hook" + extensionTypeResolver extensionType = "resolver" + extensionTypeAuthProvider extensionType = "auth_provider" + extensionTypeLifecycleHook extensionType = "lifecycle_hook" ) type extensionOperation string @@ -60,7 +60,7 @@ type extensionRegistry struct { type extensionConfigInput struct { ID string `json:"id"` - Kind string `json:"kind"` + Type string `json:"type"` Operation string `json:"operation"` Match extensionMatchInput `json:"match,omitempty"` Transport extensionTransportInput `json:"transport,omitempty"` @@ -98,7 +98,7 @@ type extensionRequestContext struct { type extensionResolverRequestEnvelope struct { Version string `json:"version"` ExtensionID string `json:"extensionId"` - Kind extensionKind `json:"kind"` + Type extensionType `json:"type"` Operation extensionOperation `json:"operation"` RequestID string `json:"requestId,omitempty"` Principal extensionPrincipalPayload `json:"principal"` @@ -123,11 +123,11 @@ type extensionResolverSpecMutation struct { } type configuredResolver struct { - id string - kind extensionKind - operation extensionOperation - match extensionMatchRule - transport configuredHTTPTransport + id string + extensionType extensionType + operation extensionOperation + match extensionMatchRule + transport configuredHTTPTransport } type extensionMatchRule struct { @@ -192,16 +192,16 @@ func newExtensionRegistry() (extensionRegistry, error) { } seen[id] = struct{}{} - kind := normalizeExtensionKind(input.Kind) - if kind == "" { - return extensionRegistry{}, fmt.Errorf("invalid %s: extensions[%d].kind is required", extensionsEnvKey, index) + extensionType := normalizeExtensionType(input.Type) + if extensionType == "" { + return extensionRegistry{}, fmt.Errorf("invalid %s: extensions[%d].type is required", extensionsEnvKey, index) } operation := normalizeExtensionOperation(input.Operation) if operation == "" { return extensionRegistry{}, fmt.Errorf("invalid %s: extensions[%d].operation is required and must be supported", extensionsEnvKey, index) } - if kind != extensionKindResolver { - return extensionRegistry{}, fmt.Errorf("invalid %s: extensions[%d].kind %q is not yet supported", extensionsEnvKey, index, kind) + if extensionType != extensionTypeResolver { + return extensionRegistry{}, fmt.Errorf("invalid %s: extensions[%d].type %q is not yet supported", extensionsEnvKey, index, extensionType) } match, err := normalizeExtensionMatch(input.Match) if err != nil { @@ -212,24 +212,24 @@ func newExtensionRegistry() (extensionRegistry, error) { return extensionRegistry{}, fmt.Errorf("invalid %s: extensions[%d].transport %v", extensionsEnvKey, index, err) } registry.resolvers = append(registry.resolvers, configuredResolver{ - id: id, - kind: kind, - operation: operation, - match: match, - transport: transport, + id: id, + extensionType: extensionType, + operation: operation, + match: match, + transport: transport, }) } return registry, nil } -func normalizeExtensionKind(raw string) extensionKind { - switch extensionKind(strings.ToLower(strings.TrimSpace(raw))) { - case extensionKindResolver: - return extensionKindResolver - case extensionKindAuthProvider: - return extensionKindAuthProvider - case extensionKindLifecycleHook: - return extensionKindLifecycleHook +func normalizeExtensionType(raw string) extensionType { + switch extensionType(strings.ToLower(strings.TrimSpace(raw))) { + case extensionTypeResolver: + return extensionTypeResolver + case extensionTypeAuthProvider: + return extensionTypeAuthProvider + case extensionTypeLifecycleHook: + return extensionTypeLifecycleHook default: return "" } @@ -294,11 +294,11 @@ func normalizePresetIDSet(values []string) (map[string]struct{}, error) { } func normalizeExtensionTransport(input extensionTransportInput) (configuredHTTPTransport, error) { - kind := extensionTransportType(strings.ToLower(strings.TrimSpace(input.Type))) - if kind == "" { - kind = extensionTransportHTTP + transportType := extensionTransportType(strings.ToLower(strings.TrimSpace(input.Type))) + if transportType == "" { + transportType = extensionTransportHTTP } - if kind != extensionTransportHTTP { + if transportType != extensionTransportHTTP { return configuredHTTPTransport{}, fmt.Errorf("type must be http") } urlValue := strings.TrimSpace(input.URL) @@ -447,7 +447,7 @@ func (r configuredResolver) resolve( envelope := extensionResolverRequestEnvelope{ Version: "v1", ExtensionID: r.id, - Kind: r.kind, + Type: r.extensionType, Operation: r.operation, RequestID: strings.TrimSpace(requestID), Principal: extensionPrincipalPayload{ diff --git a/api/extensions_test.go b/api/extensions_test.go index a2f7256..8c212a8 100644 --- a/api/extensions_test.go +++ b/api/extensions_test.go @@ -5,27 +5,27 @@ import ( "testing" ) -func TestNewExtensionRegistryRejectsUnsupportedKind(t *testing.T) { +func TestNewExtensionRegistryRejectsUnsupportedType(t *testing.T) { t.Setenv(extensionsEnvKey, `[{ "id": "login-metadata", - "kind": "auth_provider", + "type": "auth_provider", "operation": "auth.login.metadata", "transport": {"url": "https://example.com/internal/extensions/login"} }]`) _, err := newExtensionRegistry() if err == nil { - t.Fatal("expected unsupported extension kind error") + t.Fatal("expected unsupported extension type error") } if !strings.Contains(err.Error(), "not yet supported") { - t.Fatalf("expected unsupported kind error, got %v", err) + t.Fatalf("expected unsupported type error, got %v", err) } } func TestNewExtensionRegistryRejectsUnknownOperation(t *testing.T) { t.Setenv(extensionsEnvKey, `[{ "id": "runtime-binding", - "kind": "resolver", + "type": "resolver", "operation": "preset.create.typo", "transport": {"url": "https://example.com/internal/extensions/preset-create"} }]`) @@ -42,7 +42,7 @@ func TestNewExtensionRegistryRejectsUnknownOperation(t *testing.T) { func TestNewExtensionRegistryAcceptsChannelRouteResolveOperation(t *testing.T) { t.Setenv(extensionsEnvKey, `[{ "id": "channel-routing", - "kind": "resolver", + "type": "resolver", "operation": "channel.route.resolve", "transport": {"url": "https://example.com/internal/extensions/channel-routing"} }]`) @@ -75,7 +75,7 @@ func TestNormalizeExtensionMatchSanitizesPresetIDs(t *testing.T) { func TestNewExtensionRegistryRejectsInvalidSanitizedPresetID(t *testing.T) { t.Setenv(extensionsEnvKey, `[{ "id": "runtime-binding", - "kind": "resolver", + "type": "resolver", "operation": "preset.create.resolve", "match": {"presetIds": ["!!!"]}, "transport": {"url": "https://example.com/internal/extensions/preset-create"} diff --git a/api/runtime_bindings.go b/api/runtime_bindings.go index 5c40ee5..492eb45 100644 --- a/api/runtime_bindings.go +++ b/api/runtime_bindings.go @@ -14,7 +14,7 @@ import ( type runtimeBindingOwnerPrincipal struct { ID string `json:"id"` - Kind string `json:"kind"` + Type string `json:"type"` } type runtimeBindingRuntimePrincipal struct { @@ -98,7 +98,7 @@ func buildRuntimeBindingResponse(spritz *spritzv1.Spritz) (runtimeBindingRespons Namespace: namespace, OwnerPrincipal: runtimeBindingOwnerPrincipal{ ID: ownerID, - Kind: "user", + Type: "user", }, RuntimePrincipal: runtimeBindingRuntimePrincipal{ AuthnMode: "workload_identity", diff --git a/docs/2026-03-19-runtime-binding-resolution-api-implementation-plan.md b/docs/2026-03-19-runtime-binding-resolution-api-implementation-plan.md index 6cba3ae..c74751f 100644 --- a/docs/2026-03-19-runtime-binding-resolution-api-implementation-plan.md +++ b/docs/2026-03-19-runtime-binding-resolution-api-implementation-plan.md @@ -104,7 +104,7 @@ Response shape: "namespace": "spritz-production", "ownerPrincipal": { "id": "user-123", - "kind": "user" + "type": "user" }, "runtimePrincipal": { "authnMode": "workload_identity", diff --git a/docs/2026-03-19-unified-extension-framework-architecture.md b/docs/2026-03-19-unified-extension-framework-architecture.md index 78943f3..cf40d87 100644 --- a/docs/2026-03-19-unified-extension-framework-architecture.md +++ b/docs/2026-03-19-unified-extension-framework-architecture.md @@ -245,7 +245,7 @@ the API layer. High-level model: - extensions are declared in API config, -- each extension has an `id`, `kind`, `operation`, and transport, +- each extension has an `id`, `type`, `operation`, and transport, - operations decide when an extension is invoked, - Spritz builds a standard request envelope, - the extension returns a standard response envelope, @@ -292,7 +292,7 @@ Emit post-decision or post-create hooks. This phase model is more durable than creating a separate subsystem for each new feature. -## Admission Kinds +## Admission Types ### Resolver @@ -326,7 +326,7 @@ Typical uses: - emit lifecycle events, - run bookkeeping after delete or expiration. -Resolvers are the most important initial kind because they cover the existing +Resolvers are the most important initial type because they cover the existing external owner flow and the create-time preset binding problem. ## InstanceClass as a First-Class Resource @@ -573,7 +573,7 @@ hooks, for example: api: extensions: - id: external-owner - kind: resolver + type: resolver operation: owner.resolve transport: type: http @@ -582,7 +582,7 @@ api: timeout: 5s - id: runtime-binding - kind: resolver + type: resolver operation: preset.create.resolve match: presetIds: [assistant-runtime] @@ -593,7 +593,7 @@ api: timeout: 5s - id: web-login - kind: auth_provider + type: auth_provider operation: auth.login.metadata provider: loginUrl: https://console.example.com/login @@ -603,7 +603,7 @@ api: Rules: - `id` MUST be unique. -- `kind` MUST be one of the supported extension kinds. +- `type` MUST be one of the supported extension types. - `operation` MUST be one of the supported extension operations. - `match` MAY further restrict invocation, such as by preset ID. - HTTP transport MUST support timeout and auth configuration. @@ -616,7 +616,7 @@ Spritz should call resolvers with one common request contract: { "version": "v1", "extensionId": "runtime-binding", - "kind": "resolver", + "type": "resolver", "operation": "preset.create.resolve", "requestId": "req-123", "principal": { diff --git a/docs/2026-03-23-shared-app-tenant-routing-architecture.md b/docs/2026-03-23-shared-app-tenant-routing-architecture.md index edfeaab..fc8fec6 100644 --- a/docs/2026-03-23-shared-app-tenant-routing-architecture.md +++ b/docs/2026-03-23-shared-app-tenant-routing-architecture.md @@ -155,7 +155,7 @@ Examples: ### External scope type -The provider-specific scope kind for the tenant ID. +The provider-specific scope type for the tenant ID. Examples: @@ -562,7 +562,7 @@ Example request envelope: { "version": "v1", "extensionId": "channel-routing", - "kind": "resolver", + "type": "resolver", "operation": "channel.route.resolve", "requestId": "req-123", "principal": { diff --git a/docs/2026-03-24-slack-channel-gateway-implementation-plan.md b/docs/2026-03-24-slack-channel-gateway-implementation-plan.md index bef1134..9447740 100644 --- a/docs/2026-03-24-slack-channel-gateway-implementation-plan.md +++ b/docs/2026-03-24-slack-channel-gateway-implementation-plan.md @@ -207,7 +207,8 @@ product requirement says otherwise: - event type - `channel_type` - channel id - - message ts or thread ts + - message ts + - thread ts when present - external sender id 4. Gateway rejects the request if `api_app_id` or `team_id` do not match the expected shared Slack app installation. @@ -306,13 +307,20 @@ include it directly in the inbound payload. Phase 1 should keep channel behavior predictable: - direct-message conversations reply inline -- channel conversations reply in thread by default -- if inbound Slack payload already has `thread_ts`, reuse it -- if inbound channel message is not already threaded, use the source message - `ts` as `thread_ts` - -That keeps public channels cleaner and gives the concierge a consistent reply -target. +- top-level channel turns reply top-level by default +- top-level channel turns use the source Slack message ts as the conversation + identity +- threaded channel turns use the thread root `thread_ts` as the conversation + identity +- if the gateway posts a visible top-level assistant reply, later user replies + threaded off that bot message must map back to the original source-message + conversation instead of forking a new one +- if inbound Slack payload already has `thread_ts`, reuse it for the outbound + reply so existing threaded follow-ups stay in that thread + +That matches the desired Zenobot-style room behavior: visible top-level replies +for normal channel turns, with stable follow-up context only when the user is +already continuing the same Slack root message or thread. ## Persisted Metadata diff --git a/integrations/slack-gateway/backend_client.go b/integrations/slack-gateway/backend_client.go index b4b99df..ab271db 100644 --- a/integrations/slack-gateway/backend_client.go +++ b/integrations/slack-gateway/backend_client.go @@ -110,9 +110,10 @@ func (g *slackGateway) exchangeChannelSession(ctx context.Context, teamID string }, nil } -func (g *slackGateway) upsertChannelConversation(ctx context.Context, session channelSession, event slackEventInner, teamID string) (string, error) { +func (g *slackGateway) upsertChannelConversation(ctx context.Context, session channelSession, event slackEventInner, teamID, conversationID, externalConversationID string) (string, error) { body := map[string]any{ "namespace": session.Namespace, + "conversationId": strings.TrimSpace(conversationID), "principalId": g.cfg.PrincipalID, "instanceId": session.InstanceID, "ownerId": session.OwnerAuthID, @@ -120,7 +121,7 @@ func (g *slackGateway) upsertChannelConversation(ctx context.Context, session ch "externalScopeType": slackWorkspaceScope, "externalTenantId": strings.TrimSpace(teamID), "externalChannelId": strings.TrimSpace(event.Channel), - "externalConversationId": slackExternalConversationID(event), + "externalConversationId": strings.TrimSpace(externalConversationID), "title": fmt.Sprintf("Slack %s", strings.TrimSpace(event.Channel)), "cwd": defaultConversationCWD, } @@ -150,7 +151,7 @@ func (g *slackGateway) bootstrapConversation(ctx context.Context, serviceToken, return sessionID, cwd, nil } -func (g *slackGateway) postSlackMessage(ctx context.Context, token, channel, text, threadTS string) error { +func (g *slackGateway) postSlackMessage(ctx context.Context, token, channel, text, threadTS string) (string, error) { body := map[string]any{ "channel": strings.TrimSpace(channel), "text": strings.TrimSpace(text), @@ -161,33 +162,34 @@ func (g *slackGateway) postSlackMessage(ctx context.Context, token, channel, tex target := g.cfg.SlackAPIBaseURL + "/chat.postMessage" payload, err := json.Marshal(body) if err != nil { - return err + return "", err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, bytes.NewReader(payload)) if err != nil { - return err + return "", err } req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(token)) req.Header.Set("Content-Type", "application/json") resp, err := g.httpClient.Do(req) if err != nil { - return err + return "", err } defer resp.Body.Close() if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { - return fmt.Errorf("slack chat.postMessage failed: %s", resp.Status) + return "", fmt.Errorf("slack chat.postMessage failed: %s", resp.Status) } var result struct { OK bool `json:"ok"` + TS string `json:"ts,omitempty"` Error string `json:"error,omitempty"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return err + return "", err } if !result.OK { - return fmt.Errorf("slack chat.postMessage failed: %s", strings.TrimSpace(result.Error)) + return "", fmt.Errorf("slack chat.postMessage failed: %s", strings.TrimSpace(result.Error)) } - return nil + return strings.TrimSpace(result.TS), nil } func (g *slackGateway) postBackendJSON(ctx context.Context, path string, body any, target any) error { diff --git a/integrations/slack-gateway/gateway_test.go b/integrations/slack-gateway/gateway_test.go index c9a4f34..b251597 100644 --- a/integrations/slack-gateway/gateway_test.go +++ b/integrations/slack-gateway/gateway_test.go @@ -345,8 +345,8 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { } var channelConversationCall struct { sync.Mutex - authHeader string - payload map[string]any + authHeaders []string + payloads []map[string]any } slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/chat.postMessage" { @@ -357,7 +357,7 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { slackCalls.Lock() slackCalls.payloads = append(slackCalls.payloads, payload) slackCalls.Unlock() - writeJSON(w, http.StatusOK, map[string]any{"ok": true}) + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": "1711387376.000100"}) return } t.Fatalf("unexpected slack path %s", r.URL.Path) @@ -396,13 +396,19 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { t.Fatalf("decode channel conversation body: %v", err) } channelConversationCall.Lock() - channelConversationCall.authHeader = r.Header.Get("Authorization") - channelConversationCall.payload = payload + channelConversationCall.authHeaders = append(channelConversationCall.authHeaders, r.Header.Get("Authorization")) + channelConversationCall.payloads = append(channelConversationCall.payloads, payload) channelConversationCall.Unlock() - writeJSON(w, http.StatusCreated, map[string]any{ + statusCode := http.StatusCreated + created := true + if strings.TrimSpace(fmt.Sprint(payload["conversationId"])) != "" { + statusCode = http.StatusOK + created = false + } + writeJSON(w, statusCode, map[string]any{ "status": "success", "data": map[string]any{ - "created": true, + "created": created, "conversation": map[string]any{ "metadata": map[string]any{"name": "conv-1"}, "spec": map[string]any{"cwd": "/home/dev"}, @@ -519,8 +525,8 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { if !strings.Contains(fmt.Sprint(payload["text"]), "Hello from concierge") { t.Fatalf("expected assistant reply, got %#v", payload["text"]) } - if payload["thread_ts"] != "1711387375.000100" { - t.Fatalf("expected thread reply, got %#v", payload["thread_ts"]) + if _, ok := payload["thread_ts"]; ok { + t.Fatalf("expected top-level channel reply, got %#v", payload["thread_ts"]) } acpAuthHeaders.Lock() defer acpAuthHeaders.Unlock() @@ -534,11 +540,25 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { } channelConversationCall.Lock() defer channelConversationCall.Unlock() - if channelConversationCall.authHeader != "Bearer owner-token" { - t.Fatalf("expected owner token for channel conversation upsert, got %q", channelConversationCall.authHeader) + if len(channelConversationCall.authHeaders) != 2 { + t.Fatalf("expected root upsert plus alias upsert, got %#v", channelConversationCall.authHeaders) + } + for _, authHeader := range channelConversationCall.authHeaders { + if authHeader != "Bearer owner-token" { + t.Fatalf("expected owner token for channel conversation upsert, got %q", authHeader) + } + } + if channelConversationCall.payloads[0]["principalId"] != "shared-slack-gateway" { + t.Fatalf("expected shared gateway principal in first channel conversation payload, got %#v", channelConversationCall.payloads[0]["principalId"]) } - if channelConversationCall.payload["principalId"] != "shared-slack-gateway" { - t.Fatalf("expected shared gateway principal in channel conversation payload, got %#v", channelConversationCall.payload["principalId"]) + if channelConversationCall.payloads[0]["externalConversationId"] != "1711387375.000100" { + t.Fatalf("expected root-message conversation identity, got %#v", channelConversationCall.payloads[0]["externalConversationId"]) + } + if channelConversationCall.payloads[1]["conversationId"] != "conv-1" { + t.Fatalf("expected alias upsert to target the created conversation, got %#v", channelConversationCall.payloads[1]["conversationId"]) + } + if channelConversationCall.payloads[1]["externalConversationId"] != "1711387376.000100" { + t.Fatalf("expected alias upsert to persist the bot reply ts, got %#v", channelConversationCall.payloads[1]["externalConversationId"]) } return } @@ -1256,6 +1276,8 @@ func TestUpsertChannelConversationUsesChannelForDirectMessages(t *testing.T) { TS: "1711387375.000100", }, "T_workspace_1", + "", + "D_workspace_bot", ) if err != nil { t.Fatalf("upsert channel conversation failed: %v", err) @@ -1399,6 +1421,33 @@ func TestSlackDirectMessageHelpersReuseSharedDetection(t *testing.T) { if slackReplyThreadTS(groupDM) != "" { t.Fatalf("expected mpim replies to stay inline") } + + topLevelChannel := slackEventInner{ + Type: "app_mention", + Channel: "C_workspace_channel", + ChannelType: "channel", + TS: "1711387375.000100", + } + if slackExternalConversationID(topLevelChannel) != "1711387375.000100" { + t.Fatalf("expected top-level channel messages to key by root message ts") + } + if slackReplyThreadTS(topLevelChannel) != "" { + t.Fatalf("expected top-level channel mentions to reply inline") + } + + threadedChannel := slackEventInner{ + Type: "app_mention", + Channel: "C_workspace_channel", + ChannelType: "channel", + ThreadTS: "1711387375.000100", + TS: "1711387376.000100", + } + if slackExternalConversationID(threadedChannel) != "1711387375.000100" { + t.Fatalf("expected threaded channel messages to key by thread root ts") + } + if slackReplyThreadTS(threadedChannel) != "1711387375.000100" { + t.Fatalf("expected threaded channel mentions to reply in-thread") + } } func TestPromptConversationRejectsInteractivePermissionRequests(t *testing.T) { @@ -1676,8 +1725,179 @@ func TestProcessMessageEventPostsFallbackAfterPromptTimeout(t *testing.T) { if got := slackPayloads.items[0]["text"]; got != "I hit an internal error while processing that request." { t.Fatalf("expected fallback reply text, got %#v", got) } - if got := slackPayloads.items[0]["thread_ts"]; got != "1711387375.000100" { - t.Fatalf("expected threaded fallback reply, got %#v", got) + if _, ok := slackPayloads.items[0]["thread_ts"]; ok { + t.Fatalf("expected top-level fallback reply, got %#v", slackPayloads.items[0]["thread_ts"]) + } +} + +func TestProcessMessageEventPersistsReplyAliasAfterPromptTimeout(t *testing.T) { + var channelConversationCalls struct { + sync.Mutex + payloads []map[string]any + } + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": "1711387376.000100"}) + })) + defer slackAPI.Close() + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-acme", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + })) + defer backend.Close() + + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/channel-conversations/upsert": + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode channel conversation payload: %v", err) + } + channelConversationCalls.Lock() + channelConversationCalls.payloads = append(channelConversationCalls.payloads, payload) + channelConversationCalls.Unlock() + statusCode := http.StatusCreated + created := true + if strings.TrimSpace(fmt.Sprint(payload["conversationId"])) != "" { + statusCode = http.StatusOK + created = false + } + writeJSON(w, statusCode, map[string]any{ + "status": "success", + "data": map[string]any{ + "created": created, + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/bootstrap": + writeJSON(w, http.StatusOK, map[string]any{ + "status": "success", + "data": map[string]any{ + "effectiveSessionId": "session-1", + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/connect": + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade failed: %v", err) + } + defer conn.Close() + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message map[string]any + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("decode ws payload: %v", err) + } + switch message["method"] { + case "initialize": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}}) + case "session/load": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + case "session/prompt": + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": []map[string]any{{ + "type": "text", + "text": "partial reply", + }}, + }, + }, + }) + time.Sleep(40 * time.Millisecond) + return + default: + t.Fatalf("unexpected ACP method %#v", message["method"]) + } + } + default: + t.Fatalf("unexpected spritz path %s", r.URL.Path) + } + })) + defer spritz.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: spritz.URL, + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected fallback reply flow to succeed, got %v", err) + } + + channelConversationCalls.Lock() + defer channelConversationCalls.Unlock() + if len(channelConversationCalls.payloads) != 2 { + t.Fatalf("expected root upsert plus alias persistence, got %#v", channelConversationCalls.payloads) + } + if channelConversationCalls.payloads[1]["conversationId"] != "conv-1" { + t.Fatalf("expected alias persistence to target conv-1, got %#v", channelConversationCalls.payloads[1]["conversationId"]) + } + if channelConversationCalls.payloads[1]["externalConversationId"] != "1711387376.000100" { + t.Fatalf("expected alias persistence to use the bot reply ts, got %#v", channelConversationCalls.payloads[1]["externalConversationId"]) } } diff --git a/integrations/slack-gateway/slack_events.go b/integrations/slack-gateway/slack_events.go index 74d628f..0eb5abe 100644 --- a/integrations/slack-gateway/slack_events.go +++ b/integrations/slack-gateway/slack_events.go @@ -211,7 +211,8 @@ func (g *slackGateway) processMessageEventWithDelivery( return nil } - conversationID, err := g.upsertChannelConversation(ctx, session, event, envelope.TeamID) + externalConversationID := slackExternalConversationID(event) + conversationID, err := g.upsertChannelConversation(ctx, session, event, envelope.TeamID, "", externalConversationID) if err != nil { return err } @@ -227,14 +228,37 @@ func (g *slackGateway) processMessageEventWithDelivery( reply = "I hit an internal error while processing that request." g.logger.Error("acp prompt failed", "error", err, "conversation_id", conversationID) } + replyThreadTS := slackReplyThreadTS(event) replyCtx, cancelReply := context.WithTimeout(context.WithoutCancel(ctx), g.cfg.HTTPTimeout) defer cancelReply() - if err := g.postSlackMessage(replyCtx, session.ProviderAuth.BotAccessToken, event.Channel, reply, slackReplyThreadTS(event)); err != nil { + replyMessageTS, err := g.postSlackMessage(replyCtx, session.ProviderAuth.BotAccessToken, event.Channel, reply, replyThreadTS) + if err != nil { // Once the ACP prompt has already been delivered, suppress duplicate // Slack retries from re-running the same agent side effects. success = promptSent return err } + if replyThreadTS == "" && !isSlackDirectMessageEvent(event) && strings.TrimSpace(replyMessageTS) != "" { + aliasCtx, cancelAlias := context.WithTimeout(context.WithoutCancel(ctx), g.cfg.HTTPTimeout) + if _, err := g.upsertChannelConversation( + aliasCtx, + session, + event, + envelope.TeamID, + conversationID, + replyMessageTS, + ); err != nil { + cancelAlias() + g.logger.Error( + "slack reply alias persistence failed", + "error", err, + "conversation_id", conversationID, + "reply_message_ts", replyMessageTS, + ) + } else { + cancelAlias() + } + } success = true return nil } @@ -280,17 +304,17 @@ func slackReplyThreadTS(event slackEventInner) string { if strings.TrimSpace(event.ThreadTS) != "" { return strings.TrimSpace(event.ThreadTS) } - if isSlackDirectMessageEvent(event) { - return "" - } - return strings.TrimSpace(event.TS) + return "" } func slackExternalConversationID(event slackEventInner) string { if isSlackDirectMessageEvent(event) { return strings.TrimSpace(event.Channel) } - return firstNonEmpty(strings.TrimSpace(event.ThreadTS), strings.TrimSpace(event.TS)) + if threadTS := strings.TrimSpace(event.ThreadTS); threadTS != "" { + return threadTS + } + return strings.TrimSpace(event.TS) } func (g *slackGateway) verifySlackSignature(header http.Header, body []byte) error {