Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,11 @@ func runGateway() {
// Register channels/instances/links/teams RPC methods
wireChannelRPCMethods(server, pgStores, channelMgr, agentRouter, msgBus)

// Register party mode WS RPC methods
if pgStores.Party != nil {
methods.NewPartyMethods(pgStores.Party, pgStores.Agents, providerRegistry, msgBus).Register(server.Router())
}

// Wire channel event subscribers (cache invalidation, pairing, cascade disable)
wireChannelEventSubscribers(msgBus, server, pgStores, channelMgr, instanceLoader, pairingMethods, cfg)

Expand Down
16 changes: 13 additions & 3 deletions cmd/gateway_consumer_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@ import (
// It extracts the agentID from the session key and routes to the correct agent loop.
func makeSchedulerRunFunc(agents *agent.Router, cfg *config.Config) scheduler.RunFunc {
return func(ctx context.Context, req agent.RunRequest) (*agent.RunResult, error) {
// Extract agentID from session key (format: agent:{agentId}:{rest})
// Extract agentID from session key.
// Supported formats:
// agent:{agentId}:{rest}
// delegate:{sourceUUID8}:{targetAgentKey}:{delegationId}
agentID := cfg.ResolveDefaultAgentID()
if parts := strings.SplitN(req.SessionKey, ":", 3); len(parts) >= 2 && parts[0] == "agent" {
agentID = parts[1]
if parts := strings.SplitN(req.SessionKey, ":", 4); len(parts) >= 2 {
switch parts[0] {
case "agent":
agentID = parts[1]
case "delegate":
if len(parts) >= 3 {
agentID = parts[2]
}
}
}

loop, err := agents.Get(agentID)
Expand Down
37 changes: 28 additions & 9 deletions internal/agent/loop_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,18 @@ func (l *Loop) buildMessages(ctx context.Context, history []providers.Message, s
// History pipeline matching TS: limitHistoryTurns → pruneContext → sanitizeHistory.
trimmed := limitHistoryTurns(history, historyLimit)
pruned := pruneContextMessages(trimmed, l.contextWindow, l.contextPruningCfg)
messages = append(messages, sanitizeHistory(pruned)...)
sanitized, droppedCount := sanitizeHistory(pruned)
messages = append(messages, sanitized...)

// If orphaned messages were found and dropped, persist the cleaned history
// back to the session store so the same orphans don't trigger on every request.
if droppedCount > 0 {
slog.Info("sanitizeHistory: cleaned session history",
"session", sessionKey, "dropped", droppedCount)
cleanedHistory, _ := sanitizeHistory(history)
l.sessions.SetHistory(sessionKey, cleanedHistory)
l.sessions.Save(sessionKey)
}

// Current user message
messages = append(messages, providers.Message{
Expand Down Expand Up @@ -270,21 +281,26 @@ func limitHistoryTurns(msgs []providers.Message, limit int) []providers.Message
// - Orphaned tool messages at start of history (after truncation)
// - tool_result without matching tool_use in preceding assistant message
// - assistant with tool_calls but missing tool_results
func sanitizeHistory(msgs []providers.Message) []providers.Message {
//
// Returns the cleaned messages and the number of messages that were dropped or synthesized.
func sanitizeHistory(msgs []providers.Message) ([]providers.Message, int) {
if len(msgs) == 0 {
return msgs
return msgs, 0
}

dropped := 0

// 1. Skip leading orphaned tool messages (no preceding assistant with tool_calls).
start := 0
for start < len(msgs) && msgs[start].Role == "tool" {
slog.Warn("dropping orphaned tool message at history start",
slog.Debug("sanitizeHistory: dropping orphaned tool message at history start",
"tool_call_id", msgs[start].ToolCallID)
start++
dropped++
}

if start >= len(msgs) {
return nil
return nil, dropped
}

// 2. Walk through messages ensuring tool_result follows matching tool_use.
Expand All @@ -309,30 +325,33 @@ func sanitizeHistory(msgs []providers.Message) []providers.Message {
result = append(result, toolMsg)
delete(expectedIDs, toolMsg.ToolCallID)
} else {
slog.Warn("dropping mismatched tool result",
slog.Debug("sanitizeHistory: dropping mismatched tool result",
"tool_call_id", toolMsg.ToolCallID)
dropped++
}
}

// Synthesize missing tool results
for id := range expectedIDs {
slog.Warn("synthesizing missing tool result", "tool_call_id", id)
slog.Debug("sanitizeHistory: synthesizing missing tool result", "tool_call_id", id)
result = append(result, providers.Message{
Role: "tool",
Content: "[Tool result missing — session was compacted]",
ToolCallID: id,
})
dropped++
}
} else if msg.Role == "tool" {
// Orphaned tool message mid-history (no preceding assistant with matching tool_calls)
slog.Warn("dropping orphaned tool message mid-history",
slog.Debug("sanitizeHistory: dropping orphaned tool message mid-history",
"tool_call_id", msg.ToolCallID)
dropped++
} else {
result = append(result, msg)
}
}

return result
return result, dropped
}

func (l *Loop) maybeSummarize(ctx context.Context, sessionKey string) {
Expand Down
3 changes: 2 additions & 1 deletion internal/agent/memoryflush.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func (l *Loop) runMemoryFlush(ctx context.Context, sessionKey string, settings *
if len(recentHistory) > 10 {
recentHistory = recentHistory[len(recentHistory)-10:]
}
messages = append(messages, sanitizeHistory(recentHistory)...)
sanitized, _ := sanitizeHistory(recentHistory)
messages = append(messages, sanitized...)

// Flush prompt
messages = append(messages, providers.Message{
Expand Down
138 changes: 119 additions & 19 deletions internal/channels/zalo/zalo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -88,7 +90,7 @@ func (c *Channel) Start(ctx context.Context) error {
if err != nil {
return fmt.Errorf("zalo getMe failed: %w", err)
}
slog.Info("zalo bot connected", "bot_id", info.ID, "bot_name", info.Name)
slog.Info("zalo bot connected", "bot_id", info.ID, "bot_name", info.Label())

c.SetRunning(true)

Expand Down Expand Up @@ -231,14 +233,33 @@ func (c *Channel) handleImageMessage(msg *zaloMessage) {
content = "[image]"
}

// Download photo from Zalo CDN to local temp file (CDN URLs are auth-restricted/expiring)
var media []string
if msg.Photo != "" {
media = []string{msg.Photo}
var photoURL string
switch {
case msg.PhotoURL != "":
photoURL = msg.PhotoURL
case msg.Photo != "":
photoURL = msg.Photo
}

slog.Debug("zalo image message received",
if photoURL != "" {
localPath, err := c.downloadMedia(photoURL)
if err != nil {
slog.Warn("zalo photo download failed, passing URL as fallback",
"photo_url", photoURL, "error", err)
media = []string{photoURL}
} else {
media = []string{localPath}
}
}

slog.Info("zalo image message received",
"sender_id", senderID,
"chat_id", chatID,
"photo_url", photoURL,
"has_media", len(media) > 0,
"downloaded", len(media) > 0 && !strings.HasPrefix(media[0], "http"),
)

metadata := map[string]string{
Expand Down Expand Up @@ -316,6 +337,55 @@ func (c *Channel) sendPairingReply(senderID, chatID string) {
}
}

// --- Media download ---

const maxMediaBytes = 10 * 1024 * 1024 // 10MB

// downloadMedia fetches a photo from a Zalo CDN URL and saves it as a local temp file.
func (c *Channel) downloadMedia(url string) (string, error) {
resp, err := c.client.Get(url)
if err != nil {
return "", fmt.Errorf("fetch: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("http %d", resp.StatusCode)
}

// Detect extension from Content-Type
ext := ".jpg"
ct := resp.Header.Get("Content-Type")
switch {
case strings.Contains(ct, "png"):
ext = ".png"
case strings.Contains(ct, "gif"):
ext = ".gif"
case strings.Contains(ct, "webp"):
ext = ".webp"
}

f, err := os.CreateTemp("", "goclaw_zalo_*"+ext)
if err != nil {
return "", fmt.Errorf("create temp: %w", err)
}
defer f.Close()

n, err := io.Copy(f, io.LimitReader(resp.Body, maxMediaBytes))
if err != nil {
os.Remove(f.Name())
return "", fmt.Errorf("write: %w", err)
}

slog.Debug("zalo media downloaded",
"url", url[:min(len(url), 80)],
"path", filepath.Base(f.Name()),
"bytes", n,
)

return f.Name(), nil
}

// --- Chunked text sending ---

func (c *Channel) sendChunkedText(chatID, text string) error {
Expand Down Expand Up @@ -350,28 +420,41 @@ type zaloAPIResponse struct {
}

type zaloBotInfo struct {
ID string `json:"id"`
Name string `json:"name"`
ID string `json:"id"`
Name string `json:"account_name"`
DisplayName string `json:"display_name"`
}

func (b *zaloBotInfo) Label() string {
if b.DisplayName != "" {
return b.DisplayName
}
return b.Name
}

type zaloMessage struct {
MessageID string `json:"message_id"`
Text string `json:"text"`
Photo string `json:"photo"`
Caption string `json:"caption"`
From zaloFrom `json:"from"`
Chat zaloChat `json:"chat"`
Date int64 `json:"date"`
MessageID string `json:"message_id"`
MessageType string `json:"message_type"`
Text string `json:"text"`
Photo string `json:"photo"`
PhotoURL string `json:"photo_url"`
Caption string `json:"caption"`
From zaloFrom `json:"from"`
Chat zaloChat `json:"chat"`
Date int64 `json:"date"`
}

type zaloFrom struct {
ID string `json:"id"`
Username string `json:"username"`
ID string `json:"id"`
Username string `json:"username"`
DisplayName string `json:"display_name"`
IsBot bool `json:"is_bot"`
}

type zaloChat struct {
ID string `json:"id"`
Type string `json:"type"`
ID string `json:"id"`
Type string `json:"type"`
ChatType string `json:"chat_type"`
}

type zaloUpdate struct {
Expand Down Expand Up @@ -445,11 +528,28 @@ func (c *Channel) getUpdates(timeout int) ([]zaloUpdate, error) {
return nil, err
}

// Try array first
var updates []zaloUpdate
if err := json.Unmarshal(result, &updates); err != nil {
if err := json.Unmarshal(result, &updates); err == nil {
return updates, nil
}

// Try single object (Zalo Bot Platform returns one update at a time)
var single zaloUpdate
if err := json.Unmarshal(result, &single); err == nil && single.EventName != "" {
slog.Info("zalo update received", "event", single.EventName)
return []zaloUpdate{single}, nil
}

// Try wrapped {"updates": [...]}
var wrapped struct {
Updates []zaloUpdate `json:"updates"`
}
if err := json.Unmarshal(result, &wrapped); err != nil {
slog.Warn("zalo getUpdates unknown format", "raw", string(result[:min(len(result), 500)]))
return nil, fmt.Errorf("unmarshal updates: %w", err)
}
return updates, nil
return wrapped.Updates, nil
}

func (c *Channel) sendMessage(chatID, text string) error {
Expand Down
Loading