diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7edc8326..d0f9fd6c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,6 +34,7 @@ jobs: - name: Run lint, test, and codegen run: | - npm run lint npm run test + npm run lint + npm run typecheck npx convex codegen && git diff --exit-code diff --git a/packages/convex-helpers/server/stream.test.ts b/packages/convex-helpers/server/stream.test.ts index 609fea28..2396775f 100644 --- a/packages/convex-helpers/server/stream.test.ts +++ b/packages/convex-helpers/server/stream.test.ts @@ -22,6 +22,15 @@ const schema = defineSchema({ d: v.number(), e: v.number(), }).index("cde", ["c", "d", "e"]), + channels: defineTable({ + workspaceId: v.string(), + isPublic: v.boolean(), + ownerId: v.string(), + }).index("by_isPublicWorkspace", ["isPublic", "workspaceId", "ownerId"]), + channelMembers: defineTable({ + channelId: v.id("channels"), + userId: v.string(), + }).index("by_user", ["userId"]), }); function stripSystemFields(doc: GenericDocument) { @@ -505,6 +514,61 @@ describe("stream", () => { ]); }); }); + + test("merge with flatMap and default index fields", async () => { + const t = convexTest(schema, modules); + await t.run(async (ctx) => { + const workspaceId = "w"; + const userId = "u"; + const privateChannelId = await ctx.db.insert("channels", { + workspaceId, + isPublic: false, + ownerId: userId, + }); + await ctx.db.insert("channelMembers", { + channelId: privateChannelId, + userId, + }); + await ctx.db.insert("channels", { + workspaceId, + isPublic: true, + ownerId: userId, + }); + + const userMemberships = stream(ctx.db, schema) + .query("channelMembers") + .withIndex("by_user", (q) => q.eq("userId", userId)); + + const privateChannels = userMemberships.flatMap( + async (membership) => + stream(ctx.db, schema) + .query("channels") + .withIndex("by_isPublicWorkspace", (q) => + q.eq("isPublic", false).eq("workspaceId", workspaceId), + ) + .filterWith( + async (channel) => channel._id === membership.channelId, + ), + [], + ); + + const publicChannels = stream(ctx.db, schema) + .query("channels") + .withIndex("by_isPublicWorkspace", (q) => + q.eq("isPublic", true).eq("workspaceId", workspaceId), + ); + + const merged = mergedStream( + [privateChannels, publicChannels], + ["userId"], + ); + const result = await merged.collect(); + expect(result.map(stripSystemFields)).toEqual([ + { workspaceId, isPublic: false }, + { workspaceId, isPublic: true }, + ]); + }); + }); test("streamIndexRange returns correct subset", async () => { const t = convexTest(schema, modules); await t.run(async (ctx) => { diff --git a/packages/convex-helpers/server/stream.ts b/packages/convex-helpers/server/stream.ts index 4f4ebb74..76af195e 100644 --- a/packages/convex-helpers/server/stream.ts +++ b/packages/convex-helpers/server/stream.ts @@ -291,7 +291,9 @@ export abstract class QueryStream mapper: (doc: T) => Promise>, mappedIndexFields: string[], ): QueryStream { - normalizeIndexFields(mappedIndexFields); + if (mappedIndexFields.length > 0) { + normalizeIndexFields(mappedIndexFields); + } return new FlatMapStream(this, mapper, mappedIndexFields); } @@ -1269,10 +1271,13 @@ class FlatMapStreamIterator< } else { innerStream = await this.#mapper(t); if ( + this.#mappedIndexFields.length > 0 && !equalIndexFields(innerStream.getIndexFields(), this.#mappedIndexFields) ) { throw new Error( - `FlatMapStream: inner stream has different index fields than expected: ${JSON.stringify(innerStream.getIndexFields())} vs ${JSON.stringify(this.#mappedIndexFields)}`, + `FlatMapStream: inner stream has different index fields than expected: ${JSON.stringify( + innerStream.getIndexFields(), + )} vs ${JSON.stringify(this.#mappedIndexFields)}`, ); } if (innerStream.getOrder() !== this.#outerStream.getOrder()) {