diff --git a/packages/happy-app/sources/components/ChatList.tsx b/packages/happy-app/sources/components/ChatList.tsx index 72ca553cc..d978dea26 100644 --- a/packages/happy-app/sources/components/ChatList.tsx +++ b/packages/happy-app/sources/components/ChatList.tsx @@ -1,29 +1,65 @@ import * as React from 'react'; import { useSession, useSessionMessages } from "@/sync/storage"; -import { ActivityIndicator, FlatList, Platform, View } from 'react-native'; +import { ActivityIndicator, FlatList, Platform, Pressable, Text, View } from 'react-native'; import { useCallback } from 'react'; import { useHeaderHeight } from '@/utils/responsive'; import { useSafeAreaInsets } from 'react-native-safe-area-context'; +import { useUnistyles } from 'react-native-unistyles'; import { MessageView } from './MessageView'; import { Metadata, Session } from '@/sync/storageTypes'; import { ChatFooter } from './ChatFooter'; import { Message } from '@/sync/typesMessage'; +import { Typography } from '@/constants/Typography'; +import { sync } from '@/sync/sync'; export const ChatList = React.memo((props: { session: Session }) => { - const { messages } = useSessionMessages(props.session.id); + const { messages, hasOlderMessages, isLoadingOlder } = useSessionMessages(props.session.id); return ( ) }); -const ListHeader = React.memo(() => { +const LoadOlderMessages = React.memo((props: { sessionId: string; hasOlderMessages: boolean; isLoadingOlder: boolean }) => { + const { theme } = useUnistyles(); const headerHeight = useHeaderHeight(); const safeArea = useSafeAreaInsets(); - return ; + + const handlePress = useCallback(() => { + void sync.fetchOlderMessages(props.sessionId).catch(() => {}); + }, [props.sessionId]); + + return ( + + {props.hasOlderMessages && ( + props.isLoadingOlder ? ( + + ) : ( + + + Load older messages + + + ) + )} + {!props.hasOlderMessages && ( + + )} + + ); }); const ListFooter = React.memo((props: { sessionId: string }) => { @@ -37,6 +73,8 @@ const ChatListInternal = React.memo((props: { metadata: Metadata | null, sessionId: string, messages: Message[], + hasOlderMessages: boolean, + isLoadingOlder: boolean, }) => { const keyExtractor = useCallback((item: any) => item.id, []); const renderItem = useCallback(({ item }: { item: any }) => ( @@ -55,7 +93,13 @@ const ChatListInternal = React.memo((props: { keyboardDismissMode={Platform.OS === 'ios' ? 'interactive' : 'none'} renderItem={renderItem} ListHeaderComponent={} - ListFooterComponent={} + ListFooterComponent={ + + } /> ) -}); \ No newline at end of file +}); diff --git a/packages/happy-app/sources/hooks/useDemoMessages.ts b/packages/happy-app/sources/hooks/useDemoMessages.ts index 4f5bd36ef..b256b8d85 100644 --- a/packages/happy-app/sources/hooks/useDemoMessages.ts +++ b/packages/happy-app/sources/hooks/useDemoMessages.ts @@ -25,7 +25,10 @@ export function useDemoMessages(messages: Message[]) { messages: sortedMessages, messagesMap: messagesMap, reducerState: createReducer(), - isLoaded: true + isLoaded: true, + hasOlderMessages: false, + oldestLoadedSeq: null, + isLoadingOlder: false } } })); diff --git a/packages/happy-app/sources/sync/apiSocket.ts b/packages/happy-app/sources/sync/apiSocket.ts index 7e64ae583..40297818b 100644 --- a/packages/happy-app/sources/sync/apiSocket.ts +++ b/packages/happy-app/sources/sync/apiSocket.ts @@ -117,11 +117,11 @@ class ApiSocket { throw new Error(`Session encryption not found for ${sessionId}`); } - const result = await this.socket!.emitWithAck('rpc-call', { + const result = await this.socket!.timeout(30000).emitWithAck('rpc-call', { method: `${sessionId}:${method}`, params: await sessionEncryption.encryptRaw(params) }); - + if (result.ok) { return await sessionEncryption.decryptRaw(result.result) as R; } @@ -137,7 +137,7 @@ class ApiSocket { throw new Error(`Machine encryption not found for ${machineId}`); } - const result = await this.socket!.emitWithAck('rpc-call', { + const result = await this.socket!.timeout(30000).emitWithAck('rpc-call', { method: `${machineId}:${method}`, params: await machineEncryption.encryptRaw(params) }); @@ -157,7 +157,7 @@ class ApiSocket { if (!this.socket) { throw new Error('Socket not connected'); } - return await this.socket.emitWithAck(event, data); + return await this.socket.timeout(15000).emitWithAck(event, data); } // @@ -180,10 +180,20 @@ class ApiSocket { ...options?.headers }; - return fetch(url, { - ...options, - headers - }); + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 60000); + if (options?.signal) { + options.signal.addEventListener('abort', () => controller.abort(), { once: true }); + } + try { + return await fetch(url, { + ...options, + headers, + signal: controller.signal + }); + } finally { + clearTimeout(timeout); + } } // diff --git a/packages/happy-app/sources/sync/storage.ts b/packages/happy-app/sources/sync/storage.ts index 728f679ee..03065def7 100644 --- a/packages/happy-app/sources/sync/storage.ts +++ b/packages/happy-app/sources/sync/storage.ts @@ -55,6 +55,9 @@ interface SessionMessages { messagesMap: Record; reducerState: ReducerState; isLoaded: boolean; + hasOlderMessages: boolean; + oldestLoadedSeq: number | null; + isLoadingOlder: boolean; } // Machine type is now imported from storageTypes - represents persisted machine data @@ -103,6 +106,7 @@ interface StorageState { applyReady: () => void; applyMessages: (sessionId: string, messages: NormalizedMessage[]) => { changed: string[], hasReadyEvent: boolean }; applyMessagesLoaded: (sessionId: string) => void; + setOlderMessagesState: (sessionId: string, update: { hasOlderMessages?: boolean; oldestLoadedSeq?: number | null; isLoadingOlder?: boolean }) => void; applySettings: (settings: Settings, version: number) => void; applySettingsLocal: (settings: Partial) => void; applyLocalSettings: (settings: Partial) => void; @@ -434,7 +438,10 @@ export const storage = create()((set, get) => { messages: messagesArray, messagesMap: mergedMessagesMap, reducerState: existingSessionMessages.reducerState, // The reducer modifies state in-place, so this has the updates - isLoaded: existingSessionMessages.isLoaded + isLoaded: existingSessionMessages.isLoaded, + hasOlderMessages: existingSessionMessages.hasOlderMessages, + oldestLoadedSeq: existingSessionMessages.oldestLoadedSeq, + isLoadingOlder: existingSessionMessages.isLoadingOlder }; // IMPORTANT: Copy latestUsage from reducerState to Session for immediate availability @@ -490,7 +497,10 @@ export const storage = create()((set, get) => { messages: [], messagesMap: {}, reducerState: createReducer(), - isLoaded: false + isLoaded: false, + hasOlderMessages: false, + oldestLoadedSeq: null, + isLoadingOlder: false }; // Get the session's agentState if available @@ -608,7 +618,10 @@ export const storage = create()((set, get) => { reducerState, messages, messagesMap, - isLoaded: true + isLoaded: true, + hasOlderMessages: false, + oldestLoadedSeq: null, + isLoadingOlder: false } satisfies SessionMessages } }; @@ -627,6 +640,22 @@ export const storage = create()((set, get) => { return result; }), + setOlderMessagesState: (sessionId: string, update: { hasOlderMessages?: boolean; oldestLoadedSeq?: number | null; isLoadingOlder?: boolean }) => set((state) => { + const existingSession = state.sessionMessages[sessionId]; + if (!existingSession) return state; + return { + ...state, + sessionMessages: { + ...state.sessionMessages, + [sessionId]: { + ...existingSession, + ...(update.hasOlderMessages !== undefined && { hasOlderMessages: update.hasOlderMessages }), + ...(update.oldestLoadedSeq !== undefined && { oldestLoadedSeq: update.oldestLoadedSeq }), + ...(update.isLoadingOlder !== undefined && { isLoadingOlder: update.isLoadingOlder }), + } + } + }; + }), applySettingsLocal: (settings: Partial) => set((state) => { saveSettings(applySettings(state.settings, settings), state.settingsVersion ?? 0); return { @@ -1078,12 +1107,14 @@ export function useSession(id: string): Session | null { const emptyArray: unknown[] = []; -export function useSessionMessages(sessionId: string): { messages: Message[], isLoaded: boolean } { +export function useSessionMessages(sessionId: string): { messages: Message[], isLoaded: boolean, hasOlderMessages: boolean, isLoadingOlder: boolean } { return storage(useShallow((state) => { const session = state.sessionMessages[sessionId]; return { messages: session?.messages ?? emptyArray, - isLoaded: session?.isLoaded ?? false + isLoaded: session?.isLoaded ?? false, + hasOlderMessages: session?.hasOlderMessages ?? false, + isLoadingOlder: session?.isLoadingOlder ?? false }; })); } diff --git a/packages/happy-app/sources/sync/sync.ts b/packages/happy-app/sources/sync/sync.ts index 7d95297ff..60df90293 100644 --- a/packages/happy-app/sources/sync/sync.ts +++ b/packages/happy-app/sources/sync/sync.ts @@ -2,12 +2,14 @@ import Constants from 'expo-constants'; import { apiSocket } from '@/sync/apiSocket'; import { AuthCredentials } from '@/auth/tokenStorage'; import { Encryption } from '@/sync/encryption/encryption'; +import { SessionEncryption } from '@/sync/encryption/sessionEncryption'; import { decodeBase64, encodeBase64 } from '@/encryption/base64'; import { storage } from './storage'; import { ApiEphemeralUpdateSchema, ApiMessage, ApiUpdateContainerSchema } from './apiTypes'; import type { ApiEphemeralActivityUpdate } from './apiTypes'; import { Session, Machine } from './storageTypes'; import { InvalidateSync } from '@/utils/sync'; +import { delay } from '@/utils/time'; import { ActivityUpdateAccumulator } from './reducer/activityUpdateAccumulator'; import { randomUUID } from 'expo-crypto'; import * as Notifications from 'expo-notifications'; @@ -103,16 +105,22 @@ class Sync { private lastRecalculationTime = 0; constructor() { - this.sessionsSync = new InvalidateSync(this.fetchSessions); - this.settingsSync = new InvalidateSync(this.syncSettings); - this.profileSync = new InvalidateSync(this.fetchProfile); - this.purchasesSync = new InvalidateSync(this.syncPurchases); - this.machinesSync = new InvalidateSync(this.fetchMachines); - this.nativeUpdateSync = new InvalidateSync(this.fetchNativeUpdate); - this.artifactsSync = new InvalidateSync(this.fetchArtifactsList); - this.friendsSync = new InvalidateSync(this.fetchFriends); - this.friendRequestsSync = new InvalidateSync(this.fetchFriendRequests); - this.feedSync = new InvalidateSync(this.fetchFeed); + const startupSyncOpts = { + maxRetries: 50, + onError: (error: unknown) => { + console.error('[Sync] Startup sync gave up:', error); + }, + }; + this.sessionsSync = new InvalidateSync(this.fetchSessions, startupSyncOpts); + this.settingsSync = new InvalidateSync(this.syncSettings, startupSyncOpts); + this.profileSync = new InvalidateSync(this.fetchProfile, startupSyncOpts); + this.purchasesSync = new InvalidateSync(this.syncPurchases, startupSyncOpts); + this.machinesSync = new InvalidateSync(this.fetchMachines, startupSyncOpts); + this.nativeUpdateSync = new InvalidateSync(this.fetchNativeUpdate, startupSyncOpts); + this.artifactsSync = new InvalidateSync(this.fetchArtifactsList, startupSyncOpts); + this.friendsSync = new InvalidateSync(this.fetchFriends, startupSyncOpts); + this.friendRequestsSync = new InvalidateSync(this.fetchFriendRequests, startupSyncOpts); + this.feedSync = new InvalidateSync(this.fetchFeed, startupSyncOpts); const registerPushToken = async () => { if (__DEV__) { @@ -120,7 +128,7 @@ class Sync { } await this.registerPushToken(); } - this.pushTokenSync = new InvalidateSync(registerPushToken); + this.pushTokenSync = new InvalidateSync(registerPushToken, startupSyncOpts); this.activityAccumulator = new ActivityUpdateAccumulator(this.flushActivityUpdates.bind(this), 2000); // Listen for app state changes to refresh purchases @@ -240,7 +248,12 @@ class Sync { private getMessagesSync(sessionId: string): InvalidateSync { let sync = this.messagesSync.get(sessionId); if (!sync) { - sync = new InvalidateSync(() => this.fetchMessages(sessionId)); + sync = new InvalidateSync(() => this.fetchMessages(sessionId), { + maxRetries: 30, + onError: (error) => { + console.error(`[Sync] Message sync for session ${sessionId} gave up:`, error); + }, + }); this.messagesSync.set(sessionId, sync); } return sync; @@ -1561,14 +1574,15 @@ class Sync { const data = await response.json() as V3PostSessionMessagesResponse; pending.splice(0, batch.length); if (Array.isArray(data.messages) && data.messages.length > 0) { - const currentLastSeq = this.sessionLastSeq.get(sessionId) ?? 0; - let maxSeq = currentLastSeq; + let maxSeq = 0; for (const message of data.messages) { if (message.seq > maxSeq) { maxSeq = message.seq; } } - this.sessionLastSeq.set(sessionId, maxSeq); + // Monotonic advance: re-read current value to avoid rewinding if socket raced ahead + const latestSeq = this.sessionLastSeq.get(sessionId) ?? 0; + this.sessionLastSeq.set(sessionId, Math.max(latestSeq, maxSeq)); } } catch (error) { this.maybeStartBackgroundSendWatchdog(); @@ -1589,6 +1603,32 @@ class Sync { } } + private static INITIAL_MESSAGE_LIMIT = 50; + + private decryptAndNormalize = async ( + encryption: SessionEncryption, + messages: ApiMessage[] + ): Promise<{ normalized: NormalizedMessage[]; decryptFailed: number; normalizeFailed: number }> => { + const decryptedMessages = await encryption.decryptMessages(messages); + const normalized: NormalizedMessage[] = []; + let decryptFailed = 0; + let normalizeFailed = 0; + for (let i = 0; i < decryptedMessages.length; i++) { + const decrypted = decryptedMessages[i]; + if (!decrypted || !decrypted.content) { + decryptFailed++; + continue; + } + const msg = normalizeRawMessage(decrypted.id, decrypted.localId, decrypted.createdAt, decrypted.content); + if (msg) { + normalized.push(msg); + } else { + normalizeFailed++; + } + } + return { normalized, decryptFailed, normalizeFailed }; + } + private fetchMessages = async (sessionId: string) => { log.log(`💬 fetchMessages starting for session ${sessionId} - acquiring lock`); const lock = this.getSessionMessageLock(sessionId); @@ -1599,55 +1639,173 @@ class Sync { throw new Error(`Session encryption not ready for ${sessionId}`); } - let afterSeq = this.sessionLastSeq.get(sessionId) ?? 0; - let hasMore = true; - let totalNormalized = 0; + const lastSeq = this.sessionLastSeq.get(sessionId) ?? 0; + + if (lastSeq === 0) { + // Initial load: fetch latest N messages using reverse pagination + await this.fetchLatestMessages(sessionId, encryption); + } else { + // Incremental sync: fetch new messages after lastSeq + await this.fetchForwardMessages(sessionId, encryption, lastSeq); + } + + storage.getState().applyMessagesLoaded(sessionId); + }); + } + + private fetchLatestMessages = async ( + sessionId: string, + encryption: SessionEncryption + ) => { + const limit = Sync.INITIAL_MESSAGE_LIMIT; + + // No cursor params = server returns latest N messages (reverse pagination) + const response = await apiSocket.request(`/v3/sessions/${sessionId}/messages?limit=${limit}`); + if (!response.ok) { + throw new Error(`Failed to fetch latest messages for ${sessionId}: ${response.status}`); + } + const data = await response.json() as V3GetSessionMessagesResponse; + const messages = Array.isArray(data.messages) ? data.messages : []; + + const { normalized, decryptFailed, normalizeFailed } = await this.decryptAndNormalize(encryption, messages); + + // Find min and max seq + let minSeq = Infinity; + let maxSeq = 0; + for (const message of messages) { + if (message.seq < minSeq) minSeq = message.seq; + if (message.seq > maxSeq) maxSeq = message.seq; + } + + if (normalized.length > 0) { + this.applyMessages(sessionId, normalized); + } + + // Monotonic advance: never rewind cursor even if socket updates raced ahead + if (maxSeq > 0) { + const currentSeq = this.sessionLastSeq.get(sessionId) ?? 0; + this.sessionLastSeq.set(sessionId, Math.max(currentSeq, maxSeq)); + } + + // Track pagination state + storage.getState().setOlderMessagesState(sessionId, { + hasOlderMessages: data.hasMore, + oldestLoadedSeq: messages.length > 0 ? minSeq : null, + }); + + if (messages.length > 0 && normalized.length === 0) { + console.warn(`[Sync] All ${messages.length} fetched messages for session ${sessionId} were dropped (decryptFailed=${decryptFailed}, normalizeFailed=${normalizeFailed}). Session may appear empty despite having messages.`); + } + log.log(`💬 fetchLatestMessages completed for session ${sessionId} - fetched=${messages.length}, normalized=${normalized.length}, hasOlder=${data.hasMore}, lastSeq=${maxSeq}`); + } + + private fetchForwardMessages = async ( + sessionId: string, + encryption: SessionEncryption, + startAfterSeq: number + ) => { + let afterSeq = startAfterSeq; + let hasMore = true; + let totalNormalized = 0; + let totalFetched = 0; + let totalDecryptFailed = 0; + let totalNormalizeFailed = 0; + + while (hasMore) { + const response = await apiSocket.request(`/v3/sessions/${sessionId}/messages?after_seq=${afterSeq}&limit=100`); + if (!response.ok) { + throw new Error(`Failed to fetch messages for ${sessionId}: ${response.status}`); + } + const data = await response.json() as V3GetSessionMessagesResponse; + const messages = Array.isArray(data.messages) ? data.messages : []; + totalFetched += messages.length; + + let maxSeq = afterSeq; + for (const message of messages) { + if (message.seq > maxSeq) { + maxSeq = message.seq; + } + } + + const { normalized, decryptFailed, normalizeFailed } = await this.decryptAndNormalize(encryption, messages); + totalDecryptFailed += decryptFailed; + totalNormalizeFailed += normalizeFailed; + + if (normalized.length > 0) { + totalNormalized += normalized.length; + this.applyMessages(sessionId, normalized); + } + + // Monotonic advance: never rewind cursor even if socket updates raced ahead + const currentSeq = this.sessionLastSeq.get(sessionId) ?? 0; + this.sessionLastSeq.set(sessionId, Math.max(currentSeq, maxSeq)); + hasMore = !!data.hasMore; + if (hasMore && maxSeq === afterSeq) { + log.log(`💬 fetchMessages: pagination stalled for ${sessionId}, stopping to avoid infinite loop`); + break; + } + afterSeq = maxSeq; + } + + if (totalFetched > 0 && totalNormalized === 0) { + console.warn(`[Sync] All ${totalFetched} fetched messages for session ${sessionId} were dropped (decryptFailed=${totalDecryptFailed}, normalizeFailed=${totalNormalizeFailed}). Session may appear empty despite having messages.`); + } + log.log(`💬 fetchForwardMessages completed for session ${sessionId} - fetched=${totalFetched}, normalized=${totalNormalized}, decryptFailed=${totalDecryptFailed}, normalizeFailed=${totalNormalizeFailed}, lastSeq=${afterSeq}`); + } + + fetchOlderMessages = async (sessionId: string) => { + const sessionMessages = storage.getState().sessionMessages[sessionId]; + if (!sessionMessages || !sessionMessages.hasOlderMessages || sessionMessages.oldestLoadedSeq === null) { + return; + } + + storage.getState().setOlderMessagesState(sessionId, { isLoadingOlder: true }); - while (hasMore) { - const response = await apiSocket.request(`/v3/sessions/${sessionId}/messages?after_seq=${afterSeq}&limit=100`); + try { + const lock = this.getSessionMessageLock(sessionId); + await lock.inLock(async () => { + const encryption = this.encryption.getSessionEncryption(sessionId); + if (!encryption) { + throw new Error(`Session encryption not ready for ${sessionId}`); + } + + const beforeSeq = storage.getState().sessionMessages[sessionId]?.oldestLoadedSeq; + if (beforeSeq === null || beforeSeq === undefined) { + storage.getState().setOlderMessagesState(sessionId, { isLoadingOlder: false }); + return; + } + + const limit = Sync.INITIAL_MESSAGE_LIMIT; + const response = await apiSocket.request(`/v3/sessions/${sessionId}/messages?before_seq=${beforeSeq}&limit=${limit}`); if (!response.ok) { - throw new Error(`Failed to fetch messages for ${sessionId}: ${response.status}`); + throw new Error(`Failed to fetch older messages for ${sessionId}: ${response.status}`); } const data = await response.json() as V3GetSessionMessagesResponse; const messages = Array.isArray(data.messages) ? data.messages : []; - let maxSeq = afterSeq; - for (const message of messages) { - if (message.seq > maxSeq) { - maxSeq = message.seq; - } - } + const { normalized } = await this.decryptAndNormalize(encryption, messages); - const decryptedMessages = await encryption.decryptMessages(messages); - const normalizedMessages: NormalizedMessage[] = []; - for (let i = 0; i < decryptedMessages.length; i++) { - const decrypted = decryptedMessages[i]; - if (!decrypted) { - continue; - } - const normalized = normalizeRawMessage(decrypted.id, decrypted.localId, decrypted.createdAt, decrypted.content); - if (normalized) { - normalizedMessages.push(normalized); - } + let minSeq = beforeSeq; + for (const message of messages) { + if (message.seq < minSeq) minSeq = message.seq; } - if (normalizedMessages.length > 0) { - totalNormalized += normalizedMessages.length; - this.enqueueMessages(sessionId, normalizedMessages); + if (normalized.length > 0) { + this.applyMessages(sessionId, normalized); } - this.sessionLastSeq.set(sessionId, maxSeq); - hasMore = !!data.hasMore; - if (hasMore && maxSeq === afterSeq) { - log.log(`💬 fetchMessages: pagination stalled for ${sessionId}, stopping to avoid infinite loop`); - break; - } - afterSeq = maxSeq; - } + storage.getState().setOlderMessagesState(sessionId, { + hasOlderMessages: data.hasMore, + oldestLoadedSeq: messages.length > 0 ? minSeq : beforeSeq, + isLoadingOlder: false, + }); - storage.getState().applyMessagesLoaded(sessionId); - log.log(`💬 fetchMessages completed for session ${sessionId} - processed ${totalNormalized} messages`); - }); + log.log(`💬 fetchOlderMessages completed for session ${sessionId} - fetched=${messages.length}, normalized=${normalized.length}, hasOlder=${data.hasMore}`); + }); + } catch (error) { + storage.getState().setOlderMessagesState(sessionId, { isLoadingOlder: false }); + throw error; + } } private registerPushToken = async () => { @@ -1720,24 +1878,33 @@ class Sync { } private handleUpdate = async (update: unknown) => { - console.log('🔄 Sync: handleUpdate called with:', JSON.stringify(update).substring(0, 300)); const validatedUpdate = ApiUpdateContainerSchema.safeParse(update); if (!validatedUpdate.success) { - console.log('❌ Sync: Invalid update received:', validatedUpdate.error); - console.error('❌ Sync: Invalid update data:', update); + console.warn(`[Sync] Invalid update dropped:`, validatedUpdate.error.issues.map(i => `${i.path.join('.')}: ${i.message}`).join(', ')); return; } const updateData = validatedUpdate.data; - console.log(`🔄 Sync: Validated update type: ${updateData.body.t}`); if (updateData.body.t === 'new-message') { - // Get encryption - const encryption = this.encryption.getSessionEncryption(updateData.body.sid); - if (!encryption) { // Should never happen - console.error(`Session ${updateData.body.sid} not found`); - this.fetchSessions(); // Just fetch sessions again - return; + // Get encryption - if not ready, refresh sessions then fall through to fetchMessages + let encryption = this.encryption.getSessionEncryption(updateData.body.sid); + if (!encryption) { + console.error(`[Sync] Session encryption missing for ${updateData.body.sid}, refreshing sessions`); + try { + await Promise.race([ + this.sessionsSync.invalidateAndAwait(), + delay(10000).then(() => { throw new Error('Session refresh timed out'); }), + ]); + } catch (e) { + console.error(`[Sync] Failed to refresh sessions:`, e); + } + encryption = this.encryption.getSessionEncryption(updateData.body.sid); + if (!encryption) { + console.error(`[Sync] Session encryption still missing for ${updateData.body.sid} after refresh, triggering fetchMessages`); + this.getMessagesSync(updateData.body.sid).invalidate(); + return; + } } // Decrypt message @@ -1801,7 +1968,6 @@ class Sync { const currentLastSeq = this.sessionLastSeq.get(updateData.body.sid); const incomingSeq = updateData.body.message.seq; if (lastMessage && currentLastSeq !== undefined && incomingSeq === currentLastSeq + 1) { - console.log('🔄 Sync: Applying message (fast path):', JSON.stringify(lastMessage)); this.enqueueMessages(updateData.body.sid, [lastMessage]); this.sessionLastSeq.set(updateData.body.sid, incomingSeq); let hasMutableTool = false; @@ -1817,8 +1983,12 @@ class Sync { } } - // Ping session - this.onSessionVisible(updateData.body.sid); + // Refresh git status and voice hooks (but NOT messages — already handled above) + gitStatusSync.getSync(updateData.body.sid).invalidate(); + const visibleSession = storage.getState().sessions[updateData.body.sid]; + if (visibleSession) { + voiceHooks.onSessionFocus(updateData.body.sid, visibleSession.metadata || undefined); + } } else if (updateData.body.t === 'new-session') { log.log('🆕 New session update received'); diff --git a/packages/happy-app/sources/sync/typesRaw.spec.ts b/packages/happy-app/sources/sync/typesRaw.spec.ts index 2378d8d0c..104d91363 100644 --- a/packages/happy-app/sources/sync/typesRaw.spec.ts +++ b/packages/happy-app/sources/sync/typesRaw.spec.ts @@ -1603,7 +1603,7 @@ describe('Zod Transform - WOLOG Content Normalization', () => { } }); - it('drops modern user session envelopes when send flag is disabled', () => { + it('normalizes modern user session envelopes regardless of send flag', () => { const normalized = normalizeRawMessage('db-modern-user-flag-off-1', null, 1, { role: 'session', content: { @@ -1614,7 +1614,14 @@ describe('Zod Transform - WOLOG Content Normalization', () => { } } as any); - expect(normalized).toBeNull(); + expect(normalized).toBeTruthy(); + expect(normalized?.role).toBe('user'); + if (normalized && normalized.role === 'user') { + expect(normalized.content).toEqual({ + type: 'text', + text: 'modern user envelope' + }); + } }); it('uses modern user session envelopes for user content when send flag is enabled', () => { @@ -1640,7 +1647,7 @@ describe('Zod Transform - WOLOG Content Normalization', () => { } }); - it('drops legacy user text envelopes when send flag is enabled', () => { + it('normalizes legacy user text envelopes regardless of send flag', () => { process.env.ENABLE_SESSION_PROTOCOL_SEND = 'true'; const normalized = normalizeRawMessage('db-user-legacy-flag-on', null, 1, { @@ -1651,7 +1658,14 @@ describe('Zod Transform - WOLOG Content Normalization', () => { } } as any); - expect(normalized).toBeNull(); + expect(normalized).toBeTruthy(); + expect(normalized?.role).toBe('user'); + if (normalized && normalized.role === 'user') { + expect(normalized.content).toEqual({ + type: 'text', + text: 'legacy user protocol' + }); + } }); it('normalizes service events to visible agent text', () => { diff --git a/packages/happy-app/sources/sync/typesRaw.ts b/packages/happy-app/sources/sync/typesRaw.ts index 5ac33bb0f..ddcb2a5a1 100644 --- a/packages/happy-app/sources/sync/typesRaw.ts +++ b/packages/happy-app/sources/sync/typesRaw.ts @@ -539,6 +539,7 @@ function normalizeSessionEnvelope( // Session protocol requires turn id on all agent-originated envelopes. // Drop malformed agent events without turn to avoid attaching stray messages. if (envelope.role === 'agent' && !envelope.turn) { + console.warn(`[normalizeSessionEnvelope] Dropping agent envelope without turn: ev.t=${envelope.ev.t}, id=${envelope.id}`); return null; } @@ -592,10 +593,6 @@ function normalizeSessionEnvelope( if (envelope.ev.t === 'text') { if (envelope.role === 'user') { - if (!isSessionProtocolSendEnabled()) { - return null; - } - return { id: messageId, localId, @@ -716,18 +713,11 @@ export function normalizeRawMessage(id: string, localId: string | null, createdA // Zod transform handles normalization during validation let parsed = rawRecordSchema.safeParse(raw); if (!parsed.success) { - console.error('=== VALIDATION ERROR ==='); - console.error('Zod issues:', JSON.stringify(parsed.error.issues, null, 2)); - console.error('Raw message:', JSON.stringify(raw, null, 2)); - console.error('=== END ERROR ==='); + console.warn(`[normalizeRawMessage] Zod validation failed for message ${id}:`, parsed.error.issues.map(i => `${i.path.join('.')}: ${i.message}`).join(', ')); return null; } raw = parsed.data; if (raw.role === 'user') { - if (isSessionProtocolSendEnabled()) { - return null; - } - return { id, localId, @@ -749,12 +739,12 @@ export function normalizeRawMessage(id: string, localId: string | null, createdA if (raw.role === 'agent') { if (raw.content.type === 'output') { - // Skip Meta messages + // Skip Meta messages (expected — not rendered) if (raw.content.data.isMeta) { return null; } - // Skip compact summary messages + // Skip compact summary messages (expected — not rendered) if (raw.content.data.isCompactSummary) { return null; } @@ -762,6 +752,7 @@ export function normalizeRawMessage(id: string, localId: string | null, createdA // Handle Assistant messages (including sidechains) if (raw.content.data.type === 'assistant') { if (!raw.content.data.uuid) { + console.warn(`[normalizeRawMessage] Dropping assistant message ${id}: missing uuid`); return null; } let content: NormalizedAgentContent[] = []; diff --git a/packages/happy-app/sources/utils/sync.ts b/packages/happy-app/sources/utils/sync.ts index 731487840..0ba21ac51 100644 --- a/packages/happy-app/sources/utils/sync.ts +++ b/packages/happy-app/sources/utils/sync.ts @@ -1,20 +1,42 @@ -import { backoff } from "@/utils/time"; +import { backoff, BackoffGaveUpError, createBackoff } from "@/utils/time"; export class InvalidateSync { private _invalidated = false; private _invalidatedDouble = false; private _stopped = false; + private _wedged = false; private _command: () => Promise; private _pendings: (() => void)[] = []; + private _onError?: (error: unknown) => void; + private _backoff: typeof backoff; - constructor(command: () => Promise) { + constructor(command: () => Promise, opts?: { + onError?: (error: unknown) => void; + maxRetries?: number; + }) { this._command = command; + this._onError = opts?.onError; + this._backoff = opts?.maxRetries !== undefined + ? createBackoff({ maxRetries: opts.maxRetries, onError: (e) => { console.warn(e); } }) + : backoff; + } + + get isWedged() { + return this._wedged; } invalidate() { if (this._stopped) { return; } + // If previously wedged, reset and allow a fresh sync attempt + if (this._wedged) { + this._wedged = false; + this._invalidated = true; + this._invalidatedDouble = false; + this._doSync(); + return; + } if (!this._invalidated) { this._invalidated = true; this._invalidatedDouble = false; @@ -62,12 +84,26 @@ export class InvalidateSync { private _doSync = async () => { - await backoff(async () => { - if (this._stopped) { + try { + await this._backoff(async () => { + if (this._stopped) { + return; + } + await this._command(); + }); + } catch (e) { + if (e instanceof BackoffGaveUpError) { + console.error(`[InvalidateSync] Gave up after ${e.attempts} retries:`, e.lastError); + this._wedged = true; + this._invalidated = false; + this._notifyPendings(); + if (this._onError) { + this._onError(e); + } return; } - await this._command(); - }); + throw e; + } if (this._stopped) { this._notifyPendings(); return; @@ -144,21 +180,21 @@ export class ValueSync { while (this._hasValue && !this._stopped) { const value = this._latestValue!; this._hasValue = false; - + await backoff(async () => { if (this._stopped) { return; } await this._command(value); }); - + if (this._stopped) { this._notifyPendings(); return; } } - + this._processing = false; this._notifyPendings(); } -} \ No newline at end of file +} diff --git a/packages/happy-app/sources/utils/time.ts b/packages/happy-app/sources/utils/time.ts index 1feedf57b..c9069caff 100644 --- a/packages/happy-app/sources/utils/time.ts +++ b/packages/happy-app/sources/utils/time.ts @@ -7,6 +7,17 @@ export function exponentialBackoffDelay(currentFailureCount: number, minDelay: n return Math.round(Math.random() * maxDelayRet); } +export class BackoffGaveUpError extends Error { + readonly lastError: unknown; + readonly attempts: number; + constructor(lastError: unknown, attempts: number) { + super(`Backoff gave up after ${attempts} attempts: ${lastError}`); + this.name = 'BackoffGaveUpError'; + this.lastError = lastError; + this.attempts = attempts; + } +} + export type BackoffFunc = (callback: () => Promise) => Promise; export function createBackoff( @@ -14,24 +25,31 @@ export function createBackoff( onError?: (e: any, failuresCount: number) => void, minDelay?: number, maxDelay?: number, - maxFailureCount?: number + maxFailureCount?: number, + maxRetries?: number, }): BackoffFunc { return async (callback: () => Promise): Promise => { - let currentFailureCount = 0; + let totalAttempts = 0; + let delayFailureCount = 0; const minDelay = opts && opts.minDelay !== undefined ? opts.minDelay : 250; const maxDelay = opts && opts.maxDelay !== undefined ? opts.maxDelay : 1000; const maxFailureCount = opts && opts.maxFailureCount !== undefined ? opts.maxFailureCount : 50; + const maxRetries = opts && opts.maxRetries !== undefined ? opts.maxRetries : undefined; while (true) { try { return await callback(); } catch (e) { - if (currentFailureCount < maxFailureCount) { - currentFailureCount++; + totalAttempts++; + if (delayFailureCount < maxFailureCount) { + delayFailureCount++; + } + if (maxRetries !== undefined && totalAttempts >= maxRetries) { + throw new BackoffGaveUpError(e, totalAttempts); } if (opts && opts.onError) { - opts.onError(e, currentFailureCount); + opts.onError(e, totalAttempts); } - let waitForRequest = exponentialBackoffDelay(currentFailureCount, minDelay, maxDelay, maxFailureCount); + let waitForRequest = exponentialBackoffDelay(delayFailureCount, minDelay, maxDelay, maxFailureCount); await delay(waitForRequest); } } diff --git a/packages/happy-cli/src/api/apiMachine.ts b/packages/happy-cli/src/api/apiMachine.ts index b8e2b570d..150e9480a 100644 --- a/packages/happy-cli/src/api/apiMachine.ts +++ b/packages/happy-cli/src/api/apiMachine.ts @@ -165,7 +165,7 @@ export class ApiMachineClient { await backoff(async () => { const updated = handler(this.machine.metadata); - const answer = await this.socket.emitWithAck('machine-update-metadata', { + const answer = await this.socket.timeout(15000).emitWithAck('machine-update-metadata', { machineId: this.machine.id, metadata: encodeBase64(encrypt(this.machine.encryptionKey, this.machine.encryptionVariant, updated)), expectedVersion: this.machine.metadataVersion @@ -193,7 +193,7 @@ export class ApiMachineClient { await backoff(async () => { const updated = handler(this.machine.daemonState); - const answer = await this.socket.emitWithAck('machine-update-state', { + const answer = await this.socket.timeout(15000).emitWithAck('machine-update-state', { machineId: this.machine.id, daemonState: encodeBase64(encrypt(this.machine.encryptionKey, this.machine.encryptionVariant, updated)), expectedVersion: this.machine.daemonStateVersion diff --git a/packages/happy-cli/src/api/apiSession.ts b/packages/happy-cli/src/api/apiSession.ts index f8dbf61cd..4142a54c3 100644 --- a/packages/happy-cli/src/api/apiSession.ts +++ b/packages/happy-cli/src/api/apiSession.ts @@ -112,8 +112,18 @@ export class ApiSessionClient extends EventEmitter { this.agentStateVersion = session.agentStateVersion; this.encryptionKey = session.encryptionKey; this.encryptionVariant = session.encryptionVariant; - this.sendSync = new InvalidateSync(() => this.flushOutbox()); - this.receiveSync = new InvalidateSync(() => this.fetchMessages()); + this.sendSync = new InvalidateSync(() => this.flushOutbox(), { + maxRetries: 30, + onError: (error) => { + logger.debug('[API] Send sync gave up:', { error }); + }, + }); + this.receiveSync = new InvalidateSync(() => this.fetchMessages(), { + maxRetries: 30, + onError: (error) => { + logger.debug('[API] Receive sync gave up:', { error }); + }, + }); // Initialize RPC handler manager this.rpcHandlerManager = new RpcHandlerManager({ @@ -529,7 +539,7 @@ export class ApiSessionClient extends EventEmitter { this.metadataLock.inLock(async () => { await backoff(async () => { let updated = handler(this.metadata!); // Weird state if metadata is null - should never happen but here we are - const answer = await this.socket.emitWithAck('update-metadata', { sid: this.sessionId, expectedVersion: this.metadataVersion, metadata: encodeBase64(encrypt(this.encryptionKey, this.encryptionVariant, updated)) }); + const answer = await this.socket.timeout(15000).emitWithAck('update-metadata', { sid: this.sessionId, expectedVersion: this.metadataVersion, metadata: encodeBase64(encrypt(this.encryptionKey, this.encryptionVariant, updated)) }); if (answer.result === 'success') { this.metadata = decrypt(this.encryptionKey, this.encryptionVariant, decodeBase64(answer.metadata)); this.metadataVersion = answer.version; @@ -555,7 +565,7 @@ export class ApiSessionClient extends EventEmitter { this.agentStateLock.inLock(async () => { await backoff(async () => { let updated = handler(this.agentState || {}); - const answer = await this.socket.emitWithAck('update-state', { sid: this.sessionId, expectedVersion: this.agentStateVersion, agentState: updated ? encodeBase64(encrypt(this.encryptionKey, this.encryptionVariant, updated)) : null }); + const answer = await this.socket.timeout(15000).emitWithAck('update-state', { sid: this.sessionId, expectedVersion: this.agentStateVersion, agentState: updated ? encodeBase64(encrypt(this.encryptionKey, this.encryptionVariant, updated)) : null }); if (answer.result === 'success') { this.agentState = answer.agentState ? decrypt(this.encryptionKey, this.encryptionVariant, decodeBase64(answer.agentState)) : null; this.agentStateVersion = answer.version; diff --git a/packages/happy-cli/src/utils/sync.ts b/packages/happy-cli/src/utils/sync.ts index ea1cdaa52..5d2c1d2cf 100644 --- a/packages/happy-cli/src/utils/sync.ts +++ b/packages/happy-cli/src/utils/sync.ts @@ -1,20 +1,42 @@ -import { backoff } from "@/utils/time"; +import { backoff, BackoffGaveUpError, createBackoff } from "@/utils/time"; export class InvalidateSync { private _invalidated = false; private _invalidatedDouble = false; private _stopped = false; + private _wedged = false; private _command: () => Promise; private _pendings: (() => void)[] = []; + private _onError?: (error: unknown) => void; + private _backoff: typeof backoff; - constructor(command: () => Promise) { + constructor(command: () => Promise, opts?: { + onError?: (error: unknown) => void; + maxRetries?: number; + }) { this._command = command; + this._onError = opts?.onError; + this._backoff = opts?.maxRetries !== undefined + ? createBackoff({ maxRetries: opts.maxRetries, onError: (e) => { console.warn(e); } }) + : backoff; + } + + get isWedged() { + return this._wedged; } invalidate() { if (this._stopped) { return; } + // If previously wedged, reset and allow a fresh sync attempt + if (this._wedged) { + this._wedged = false; + this._invalidated = true; + this._invalidatedDouble = false; + this._doSync(); + return; + } if (!this._invalidated) { this._invalidated = true; this._invalidatedDouble = false; @@ -36,6 +58,15 @@ export class InvalidateSync { }); } + async awaitQueue() { + if (this._stopped || (!this._invalidated && this._pendings.length === 0)) { + return; + } + await new Promise(resolve => { + this._pendings.push(resolve); + }); + } + stop() { if (this._stopped) { return; @@ -53,12 +84,26 @@ export class InvalidateSync { private _doSync = async () => { - await backoff(async () => { - if (this._stopped) { + try { + await this._backoff(async () => { + if (this._stopped) { + return; + } + await this._command(); + }); + } catch (e) { + if (e instanceof BackoffGaveUpError) { + console.error(`[InvalidateSync] Gave up after ${e.attempts} retries:`, e.lastError); + this._wedged = true; + this._invalidated = false; + this._notifyPendings(); + if (this._onError) { + this._onError(e); + } return; } - await this._command(); - }); + throw e; + } if (this._stopped) { this._notifyPendings(); return; @@ -71,4 +116,4 @@ export class InvalidateSync { this._notifyPendings(); } } -} \ No newline at end of file +} diff --git a/packages/happy-cli/src/utils/time.ts b/packages/happy-cli/src/utils/time.ts index 9570d114d..559d51ffd 100644 --- a/packages/happy-cli/src/utils/time.ts +++ b/packages/happy-cli/src/utils/time.ts @@ -7,6 +7,17 @@ export function exponentialBackoffDelay(currentFailureCount: number, minDelay: n return Math.round(Math.random() * maxDelayRet); } +export class BackoffGaveUpError extends Error { + readonly lastError: unknown; + readonly attempts: number; + constructor(lastError: unknown, attempts: number) { + super(`Backoff gave up after ${attempts} attempts: ${lastError}`); + this.name = 'BackoffGaveUpError'; + this.lastError = lastError; + this.attempts = attempts; + } +} + export type BackoffFunc = (callback: () => Promise) => Promise; export function createBackoff( @@ -14,28 +25,35 @@ export function createBackoff( onError?: (e: any, failuresCount: number) => void, minDelay?: number, maxDelay?: number, - maxFailureCount?: number + maxFailureCount?: number, + maxRetries?: number, }): BackoffFunc { return async (callback: () => Promise): Promise => { - let currentFailureCount = 0; + let totalAttempts = 0; + let delayFailureCount = 0; const minDelay = opts && opts.minDelay !== undefined ? opts.minDelay : 250; const maxDelay = opts && opts.maxDelay !== undefined ? opts.maxDelay : 1000; const maxFailureCount = opts && opts.maxFailureCount !== undefined ? opts.maxFailureCount : 50; + const maxRetries = opts && opts.maxRetries !== undefined ? opts.maxRetries : undefined; while (true) { try { return await callback(); } catch (e) { - if (currentFailureCount < maxFailureCount) { - currentFailureCount++; + totalAttempts++; + if (delayFailureCount < maxFailureCount) { + delayFailureCount++; + } + if (maxRetries !== undefined && totalAttempts >= maxRetries) { + throw new BackoffGaveUpError(e, totalAttempts); } if (opts && opts.onError) { - opts.onError(e, currentFailureCount); + opts.onError(e, totalAttempts); } - let waitForRequest = exponentialBackoffDelay(currentFailureCount, minDelay, maxDelay, maxFailureCount); + let waitForRequest = exponentialBackoffDelay(delayFailureCount, minDelay, maxDelay, maxFailureCount); await delay(waitForRequest); } } }; } -export let backoff = createBackoff(); \ No newline at end of file +export let backoff = createBackoff(); diff --git a/packages/happy-server/sources/app/api/routes/v3SessionRoutes.test.ts b/packages/happy-server/sources/app/api/routes/v3SessionRoutes.test.ts index bb73ecc61..6c6acbca0 100644 --- a/packages/happy-server/sources/app/api/routes/v3SessionRoutes.test.ts +++ b/packages/happy-server/sources/app/api/routes/v3SessionRoutes.test.ts @@ -127,12 +127,17 @@ const { if (typeof args?.where?.seq?.gt === "number") { rows = rows.filter((message) => message.seq > args.where.seq.gt); } + if (typeof args?.where?.seq?.lt === "number") { + rows = rows.filter((message) => message.seq < args.where.seq.lt); + } if (Array.isArray(args?.where?.localId?.in)) { const localIds = new Set(args.where.localId.in); rows = rows.filter((message) => localIds.has(message.localId)); } if (args?.orderBy?.seq === "asc") { rows.sort((a, b) => a.seq - b.seq); + } else if (args?.orderBy?.seq === "desc") { + rows.sort((a, b) => b.seq - a.seq); } if (args?.orderBy?.createdAt === "desc") { rows.sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime()); @@ -436,6 +441,94 @@ describe("v3SessionRoutes", () => { expect(emitUpdateMock).toHaveBeenCalledTimes(1); }); + it("supports reverse pagination with before_seq", async () => { + seedSession({ id: "session-1", accountId: "user-1" }); + for (let seq = 1; seq <= 10; seq += 1) { + seedMessage({ sessionId: "session-1", seq, localId: `l${seq}`, content: { t: "encrypted", c: String(seq) } }); + } + + app = await createApp(); + + // Get the latest 3 messages (before seq 11 = all, limited to 3) + const page1 = await app.inject({ + method: "GET", + url: "/v3/sessions/session-1/messages?before_seq=11&limit=3", + headers: { "x-user-id": "user-1" } + }); + const body1 = page1.json(); + expect(body1.messages.map((m: any) => m.seq)).toEqual([8, 9, 10]); + expect(body1.hasMore).toBe(true); + + // Get the next 3 older messages + const page2 = await app.inject({ + method: "GET", + url: "/v3/sessions/session-1/messages?before_seq=8&limit=3", + headers: { "x-user-id": "user-1" } + }); + const body2 = page2.json(); + expect(body2.messages.map((m: any) => m.seq)).toEqual([5, 6, 7]); + expect(body2.hasMore).toBe(true); + + // Get the remaining older messages + const page3 = await app.inject({ + method: "GET", + url: "/v3/sessions/session-1/messages?before_seq=5&limit=10", + headers: { "x-user-id": "user-1" } + }); + const body3 = page3.json(); + expect(body3.messages.map((m: any) => m.seq)).toEqual([1, 2, 3, 4]); + expect(body3.hasMore).toBe(false); + }); + + it("rejects specifying both after_seq and before_seq", async () => { + seedSession({ id: "session-1", accountId: "user-1" }); + app = await createApp(); + + const response = await app.inject({ + method: "GET", + url: "/v3/sessions/session-1/messages?after_seq=0&before_seq=10", + headers: { "x-user-id": "user-1" } + }); + expect(response.statusCode).toBe(400); + }); + + it("returns latest messages when no cursor params specified (with limit)", async () => { + seedSession({ id: "session-1", accountId: "user-1" }); + for (let seq = 1; seq <= 10; seq += 1) { + seedMessage({ sessionId: "session-1", seq, localId: `l${seq}`, content: { t: "encrypted", c: String(seq) } }); + } + + app = await createApp(); + + // No after_seq or before_seq, just limit=3 → latest 3 messages + const response = await app.inject({ + method: "GET", + url: "/v3/sessions/session-1/messages?limit=3", + headers: { "x-user-id": "user-1" } + }); + const body = response.json(); + expect(response.statusCode).toBe(200); + expect(body.messages.map((m: any) => m.seq)).toEqual([8, 9, 10]); + expect(body.hasMore).toBe(true); + }); + + it("returns empty results for before_seq=1 (no messages before seq 1)", async () => { + seedSession({ id: "session-1", accountId: "user-1" }); + seedMessage({ sessionId: "session-1", seq: 1, localId: "l1", content: { t: "encrypted", c: "a" } }); + + app = await createApp(); + const response = await app.inject({ + method: "GET", + url: "/v3/sessions/session-1/messages?before_seq=1", + headers: { "x-user-id": "user-1" } + }); + + expect(response.statusCode).toBe(200); + const body = response.json(); + expect(body.messages).toEqual([]); + expect(body.hasMore).toBe(false); + }); + it("enforces send validation limits and auth/session ownership", async () => { seedSession({ id: "session-1", accountId: "owner-user" }); app = await createApp(); diff --git a/packages/happy-server/sources/app/api/routes/v3SessionRoutes.ts b/packages/happy-server/sources/app/api/routes/v3SessionRoutes.ts index 1a013da6e..6d81124d1 100644 --- a/packages/happy-server/sources/app/api/routes/v3SessionRoutes.ts +++ b/packages/happy-server/sources/app/api/routes/v3SessionRoutes.ts @@ -6,9 +6,13 @@ import { z } from "zod"; import { type Fastify } from "../types"; const getMessagesQuerySchema = z.object({ - after_seq: z.coerce.number().int().min(0).default(0), + after_seq: z.coerce.number().int().min(0).optional(), + before_seq: z.coerce.number().int().min(1).optional(), limit: z.coerce.number().int().min(1).max(500).default(100) -}); +}).refine( + (data) => !(data.after_seq !== undefined && data.before_seq !== undefined), + { message: "Cannot specify both after_seq and before_seq" } +); const sendMessagesBodySchema = z.object({ messages: z.array(z.object({ @@ -59,7 +63,7 @@ export function v3SessionRoutes(app: Fastify) { }, async (request, reply) => { const userId = request.userId; const { sessionId } = request.params; - const { after_seq, limit } = request.query; + const { after_seq, before_seq, limit } = request.query; const session = await db.session.findFirst({ where: { @@ -73,10 +77,46 @@ export function v3SessionRoutes(app: Fastify) { return reply.code(404).send({ error: 'Session not found' }); } + if (before_seq !== undefined || (after_seq === undefined && before_seq === undefined)) { + // Reverse pagination: newest-first + // - with before_seq: returns messages with seq < before_seq + // - without any cursor: returns the latest messages + const whereClause: { sessionId: string; seq?: { lt: number } } = { sessionId }; + if (before_seq !== undefined) { + whereClause.seq = { lt: before_seq }; + } + + const messages = await db.sessionMessage.findMany({ + where: whereClause, + orderBy: { seq: 'desc' }, + take: limit + 1, + select: { + id: true, + seq: true, + content: true, + localId: true, + createdAt: true, + updatedAt: true + } + }); + + const hasMore = messages.length > limit; + const page = hasMore ? messages.slice(0, limit) : messages; + // Return in ascending seq order + page.reverse(); + + return reply.send({ + messages: page.map(toResponseMessage), + hasMore + }); + } + + // Forward pagination: oldest-first (with after_seq) + const afterSeq = after_seq ?? 0; const messages = await db.sessionMessage.findMany({ where: { sessionId, - seq: { gt: after_seq } + seq: { gt: afterSeq } }, orderBy: { seq: 'asc' }, take: limit + 1, diff --git a/packages/happy-server/sources/app/api/socket/sessionUpdateHandler.ts b/packages/happy-server/sources/app/api/socket/sessionUpdateHandler.ts index 4b5cfb16e..46f8c4602 100644 --- a/packages/happy-server/sources/app/api/socket/sessionUpdateHandler.ts +++ b/packages/happy-server/sources/app/api/socket/sessionUpdateHandler.ts @@ -26,6 +26,9 @@ export function sessionUpdateHandler(userId: string, socket: Socket, connection: where: { id: sid, accountId: userId } }); if (!session) { + if (callback) { + callback({ result: 'error' }); + } return; } diff --git a/packages/happy-server/sources/app/events/eventRouter.ts b/packages/happy-server/sources/app/events/eventRouter.ts index 808ab40d8..18cab2b1f 100644 --- a/packages/happy-server/sources/app/events/eventRouter.ts +++ b/packages/happy-server/sources/app/events/eventRouter.ts @@ -146,6 +146,7 @@ export type UpdateEvent = { body: any; cursor: string; createdAt: number; + repeatKey: string | null; } | { type: 'kv-batch-update'; changes: Array<{ @@ -602,6 +603,7 @@ export function buildNewFeedPostUpdate(feedItem: { body: any; cursor: string; createdAt: number; + repeatKey?: string | null; }, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, @@ -611,7 +613,8 @@ export function buildNewFeedPostUpdate(feedItem: { id: feedItem.id, body: feedItem.body, cursor: feedItem.cursor, - createdAt: feedItem.createdAt + createdAt: feedItem.createdAt, + repeatKey: feedItem.repeatKey ?? null }, createdAt: Date.now() };