Skip to content

Commit e4dda28

Browse files
authored
Merge pull request #3 from neriousy/feat/threads-stored-in-convex
feat: storing threads and messages in convex
2 parents b83e3f0 + 374cf58 commit e4dda28

File tree

21 files changed

+1990
-705
lines changed

21 files changed

+1990
-705
lines changed

apps/chat/convex/_generated/api.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import type {
1414
FunctionReference,
1515
} from "convex/server";
1616
import type * as auth from "../auth.js";
17+
import type * as conversations from "../conversations.js";
1718
import type * as http from "../http.js";
19+
import type * as messages from "../messages.js";
1820
import type * as user from "../user.js";
1921

2022
/**
@@ -27,7 +29,9 @@ import type * as user from "../user.js";
2729
*/
2830
declare const fullApi: ApiFromModules<{
2931
auth: typeof auth;
32+
conversations: typeof conversations;
3033
http: typeof http;
34+
messages: typeof messages;
3135
user: typeof user;
3236
}>;
3337
export declare const api: FilterApi<

apps/chat/convex/conversations.ts

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
import { getAuthUserId } from '@convex-dev/auth/server';
2+
import { query, mutation } from './_generated/server';
3+
import { v } from 'convex/values';
4+
import { Id } from './_generated/dataModel';
5+
6+
// Get user's conversations
7+
export const list = query({
8+
args: {},
9+
handler: async (ctx) => {
10+
const userId = await getAuthUserId(ctx);
11+
if (userId === null) {
12+
return [];
13+
}
14+
15+
// Get conversations ordered by most recent
16+
const conversations = await ctx.db
17+
.query('conversations')
18+
.withIndex('by_user_updated', (q) => q.eq('userId', userId))
19+
.order('desc')
20+
.collect();
21+
22+
// Get all messages for this user in a single query (fixes N+1 problem)
23+
const allUserMessages = await ctx.db
24+
.query('messages')
25+
.withIndex('by_user', (q) => q.eq('userId', userId))
26+
.collect();
27+
28+
// Group messages by conversation ID
29+
type Message = (typeof allUserMessages)[number];
30+
const messagesByConversationId = new Map<Id<'conversations'>, Message[]>();
31+
for (const message of allUserMessages) {
32+
const conversationId = message.conversationId;
33+
if (!messagesByConversationId.has(conversationId)) {
34+
messagesByConversationId.set(conversationId, []);
35+
}
36+
messagesByConversationId.get(conversationId)!.push(message);
37+
}
38+
39+
// Sort messages by createdAt for each conversation
40+
for (const messages of messagesByConversationId.values()) {
41+
messages.sort((a, b) => a.createdAt - b.createdAt);
42+
}
43+
44+
// Enrich conversations with their messages
45+
const conversationsWithMessages = conversations.map((conversation) => ({
46+
...conversation,
47+
messages: messagesByConversationId.get(conversation._id) || [],
48+
}));
49+
50+
return conversationsWithMessages;
51+
},
52+
});
53+
54+
// Get a specific conversation with messages
55+
export const get = query({
56+
args: { conversationId: v.id('conversations') },
57+
handler: async (ctx, args) => {
58+
const userId = await getAuthUserId(ctx);
59+
if (userId === null) {
60+
return null;
61+
}
62+
63+
const conversation = await ctx.db.get(args.conversationId);
64+
if (!conversation) {
65+
return null;
66+
}
67+
68+
// Verify ownership
69+
if (conversation.userId !== userId) {
70+
return null;
71+
}
72+
73+
// Get messages
74+
const messages = await ctx.db
75+
.query('messages')
76+
.withIndex('by_conversation_created', (q) =>
77+
q.eq('conversationId', conversation._id)
78+
)
79+
.collect();
80+
81+
return {
82+
...conversation,
83+
messages,
84+
};
85+
},
86+
});
87+
88+
// Create a new conversation
89+
export const create = mutation({
90+
args: {
91+
title: v.string(),
92+
model: v.string(),
93+
mcpEnabled: v.boolean(),
94+
},
95+
handler: async (ctx, args) => {
96+
const userId = await getAuthUserId(ctx);
97+
if (userId === null) {
98+
throw new Error('Not authenticated');
99+
}
100+
101+
// Create the conversation
102+
const conversationId = await ctx.db.insert('conversations', {
103+
userId: userId,
104+
title: args.title,
105+
model: args.model,
106+
mcpEnabled: args.mcpEnabled,
107+
isArchived: false,
108+
hasActiveStream: false,
109+
createdAt: Date.now(),
110+
updatedAt: Date.now(),
111+
});
112+
113+
// Return the full conversation object to avoid optimistic state drift
114+
const conversation = await ctx.db.get(conversationId);
115+
if (!conversation) {
116+
throw new Error('Failed to retrieve created conversation');
117+
}
118+
119+
return {
120+
...conversation,
121+
messages: [], // New conversations have no messages
122+
};
123+
},
124+
});
125+
126+
// Update a conversation
127+
export const update = mutation({
128+
args: {
129+
conversationId: v.id('conversations'),
130+
title: v.optional(v.string()),
131+
model: v.optional(v.string()),
132+
mcpEnabled: v.optional(v.boolean()),
133+
isArchived: v.optional(v.boolean()),
134+
hasActiveStream: v.optional(v.boolean()),
135+
streamState: v.optional(
136+
v.object({
137+
activeMessageId: v.optional(v.id('messages')),
138+
streamStartedAt: v.optional(v.number()),
139+
lastActivity: v.number(),
140+
})
141+
),
142+
},
143+
handler: async (ctx, args) => {
144+
const userId = await getAuthUserId(ctx);
145+
if (userId === null) {
146+
throw new Error('Not authenticated');
147+
}
148+
149+
const conversation = await ctx.db.get(args.conversationId);
150+
if (!conversation) {
151+
throw new Error('Conversation not found');
152+
}
153+
154+
// Verify ownership
155+
if (conversation.userId !== userId) {
156+
throw new Error('Not authorized');
157+
}
158+
159+
// Update conversation
160+
const updates: Partial<{
161+
title: string;
162+
model: string;
163+
mcpEnabled: boolean;
164+
isArchived: boolean;
165+
hasActiveStream: boolean;
166+
streamState: {
167+
activeMessageId?: Id<'messages'>;
168+
streamStartedAt?: number;
169+
lastActivity: number;
170+
};
171+
updatedAt: number;
172+
}> = {
173+
updatedAt: Date.now(),
174+
};
175+
176+
if (args.title !== undefined) updates.title = args.title;
177+
if (args.model !== undefined) updates.model = args.model;
178+
if (args.mcpEnabled !== undefined) updates.mcpEnabled = args.mcpEnabled;
179+
if (args.isArchived !== undefined) updates.isArchived = args.isArchived;
180+
if (args.hasActiveStream !== undefined)
181+
updates.hasActiveStream = args.hasActiveStream;
182+
if (args.streamState !== undefined) {
183+
updates.streamState = {
184+
...conversation.streamState,
185+
...args.streamState,
186+
};
187+
}
188+
189+
await ctx.db.patch(args.conversationId, updates);
190+
return args.conversationId;
191+
},
192+
});
193+
194+
// Delete a conversation
195+
export const remove = mutation({
196+
args: { conversationId: v.id('conversations') },
197+
handler: async (ctx, args) => {
198+
const userId = await getAuthUserId(ctx);
199+
if (userId === null) {
200+
throw new Error('Not authenticated');
201+
}
202+
203+
const conversation = await ctx.db.get(args.conversationId);
204+
if (!conversation) {
205+
throw new Error('Conversation not found');
206+
}
207+
208+
// Verify ownership
209+
if (conversation.userId !== userId) {
210+
throw new Error('Not authorized');
211+
}
212+
213+
// Delete all messages in the conversation
214+
const messages = await ctx.db
215+
.query('messages')
216+
.withIndex('by_conversation_created', (q) =>
217+
q.eq('conversationId', args.conversationId)
218+
)
219+
.collect();
220+
221+
for (const message of messages) {
222+
await ctx.db.delete(message._id);
223+
}
224+
225+
// Delete stream chunks
226+
const chunks = await ctx.db
227+
.query('streamChunks')
228+
.withIndex('by_conversation', (q) =>
229+
q.eq('conversationId', args.conversationId)
230+
)
231+
.collect();
232+
233+
for (const chunk of chunks) {
234+
await ctx.db.delete(chunk._id);
235+
}
236+
237+
// Delete stream sessions
238+
const sessions = await ctx.db
239+
.query('streamSessions')
240+
.withIndex('by_conversation', (q) =>
241+
q.eq('conversationId', args.conversationId)
242+
)
243+
.collect();
244+
245+
for (const session of sessions) {
246+
await ctx.db.delete(session._id);
247+
}
248+
249+
// Finally delete the conversation
250+
await ctx.db.delete(args.conversationId);
251+
return args.conversationId;
252+
},
253+
});
254+
255+
// Generate title for conversation
256+
export const generateTitle = mutation({
257+
args: {
258+
conversationId: v.id('conversations'),
259+
title: v.string(),
260+
},
261+
handler: async (ctx, args) => {
262+
const userId = await getAuthUserId(ctx);
263+
if (userId === null) {
264+
throw new Error('Not authenticated');
265+
}
266+
267+
const conversation = await ctx.db.get(args.conversationId);
268+
if (!conversation) {
269+
throw new Error('Conversation not found');
270+
}
271+
272+
// Verify ownership
273+
if (conversation.userId !== userId) {
274+
throw new Error('Not authorized');
275+
}
276+
277+
await ctx.db.patch(args.conversationId, {
278+
title: args.title,
279+
updatedAt: Date.now(),
280+
});
281+
282+
return args.conversationId;
283+
},
284+
});

0 commit comments

Comments
 (0)