Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-sync-duplicate-key-utils.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/db': patch
---

Fix live query includes reconciliation so updates that re-emit existing child rows update internal child collections instead of attempting duplicate inserts, and ensure duplicate-key sync errors handle collection configs without live query internals.
7 changes: 4 additions & 3 deletions packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ export class CollectionSyncManager<
// throwing a duplicate-key error during reconciliation.
messageType = `update`
} else {
const utils = this.config
.utils as Partial<LiveQueryCollectionUtils>
const internal = utils[LIVE_QUERY_INTERNAL]
const utils = this.config.utils as
| Partial<LiveQueryCollectionUtils>
| undefined
const internal = utils?.[LIVE_QUERY_INTERNAL]
throw new DuplicateKeySyncError(key, this.id, {
hasCustomGetKey: internal?.hasCustomGetKey ?? false,
hasJoins: internal?.hasJoins ?? false,
Expand Down
15 changes: 10 additions & 5 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1671,14 +1671,19 @@ function flushIncludesState(
if (entry.orderByIndices && change.orderByIndex !== undefined) {
entry.orderByIndices.set(change.value, change.orderByIndex)
}
const key = entry.syncMethods.collection.getKeyFromItem(
change.value,
)
const childAlreadyExists = entry.syncMethods.collection.has(key)

if (change.inserts > 0 && change.deletes === 0) {
entry.syncMethods.write({ value: change.value, type: `insert` })
entry.syncMethods.write({
value: change.value,
type: childAlreadyExists ? `update` : `insert`,
})
} else if (
change.inserts > change.deletes ||
(change.inserts === change.deletes &&
entry.syncMethods.collection.has(
entry.syncMethods.collection.getKeyFromItem(change.value),
))
(change.inserts === change.deletes && childAlreadyExists)
) {
entry.syncMethods.write({ value: change.value, type: `update` })
} else if (change.deletes > 0) {
Expand Down
37 changes: 36 additions & 1 deletion packages/db/tests/collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createCollection } from '../src/collection/index.js'
import {
CollectionRequiresConfigError,
DuplicateKeyError,
DuplicateKeySyncError,
InvalidKeyError,
KeyUpdateNotAllowedError,
MissingDeleteHandlerError,
Expand All @@ -18,7 +19,12 @@ import {
stripVirtualProps,
withExpectedRejection,
} from './utils'
import type { ChangeMessage, MutationFn, PendingMutation } from '../src/types'
import type {
ChangeMessage,
MutationFn,
PendingMutation,
SyncConfig,
} from '../src/types'

const getStateValue = <T extends object, TKey extends string | number>(
collection: { state: Map<TKey, T> },
Expand All @@ -42,6 +48,35 @@ describe(`Collection`, () => {
expect(() => createCollection()).toThrow(CollectionRequiresConfigError)
})

it(`throws DuplicateKeySyncError instead of TypeError when config has no utils`, async () => {
let begin!: () => void
let write!: Parameters<
SyncConfig<{ id: number; text: string }, number>[`sync`]
>[0][`write`]

const collection = createCollection<{ id: number; text: string }, number>({
id: `duplicate-key-no-utils-test`,
getKey: (item) => item.id,
sync: {
sync: (params) => {
begin = params.begin
write = params.write
params.begin()
params.write({ type: `insert`, value: { id: 1, text: `one` } })
params.commit()
params.markReady()
},
},
})

await collection.stateWhenReady()

begin()
expect(() =>
write({ type: `insert`, value: { id: 1, text: `changed` } }),
).toThrow(DuplicateKeySyncError)
})

it(`removes optimistic insert when sync confirms with a different server-generated key`, async () => {
const options = mockSyncCollectionOptionsNoInitialState<{
id: number
Expand Down
54 changes: 54 additions & 0 deletions packages/db/tests/query/includes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,60 @@ describe(`includes subqueries`, () => {
})

describe(`change propagation`, () => {
it(`Collection includes: joined child update does not duplicate insert into child collection`, async () => {
type LineItem = { id: number; productId: number; qty: number }
type Product = { id: number; categoryId: number; name: string }

const lineItems = createCollection(
mockSyncCollectionOptions<LineItem>({
id: `includes-line-items`,
getKey: (lineItem) => lineItem.id,
initialData: [{ id: 1, productId: 10, qty: 1 }],
}),
)
const products = createCollection(
mockSyncCollectionOptions<Product>({
id: `includes-products`,
getKey: (product) => product.id,
initialData: [{ id: 10, categoryId: 1, name: `Widget` }],
}),
)

const collection = createLiveQueryCollection((q) =>
q.from({ lineItem: lineItems }).select(({ lineItem }) => ({
id: lineItem.id,
product: q
.from({ product: products })
.where(({ product }) => eq(product.id, lineItem.productId))
.select(({ product }) => ({
id: product.id,
categoryId: product.categoryId,
name: product.name,
})),
})),
)
await collection.preload()

lineItems.utils.begin()
expect(() => {
lineItems.utils.write({
type: `delete`,
value: { id: 1, productId: 10, qty: 1 },
})
lineItems.utils.write({
type: `insert`,
value: { id: 1, productId: 10, qty: 2 },
})
}).not.toThrow()
lineItems.utils.commit()

await vi.waitFor(() => {
expect(childItems((collection.get(1) as any).product)).toEqual([
{ id: 10, categoryId: 1, name: `Widget` },
])
})
})

it(`Collection includes: child change does not re-emit the parent row`, async () => {
const collection = buildIncludesQuery()
await collection.preload()
Expand Down
74 changes: 73 additions & 1 deletion packages/query-db-collection/tests/query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import {
inArray,
or,
} from '@tanstack/db'
import { stripVirtualProps } from '../../db/tests/utils'
import {
mockSyncCollectionOptions,
stripVirtualProps,
} from '../../db/tests/utils'
import { persistedCollectionOptions } from '../../db-sqlite-persistence-core/src'
import { queryCollectionOptions } from '../src/query'
import type { QueryFunctionContext } from '@tanstack/query-core'
Expand Down Expand Up @@ -228,6 +231,75 @@ describe(`QueryCollection`, () => {
expect(collection._state.syncedData.get(`2`)).toEqual(initialItems[1])
})

it(`should not duplicate insert into includes child collection after update refetch`, async () => {
type LineItem = { id: string; productId: string }
type Product = { id: string; categoryId: number; name: string }

const lineItems = createCollection(
mockSyncCollectionOptions<LineItem>({
id: `query-collection-line-items`,
getKey: (lineItem) => lineItem.id,
initialData: [{ id: `line-1`, productId: `product-1` }],
}),
)

let productsData: Array<Product> = [
{ id: `product-1`, categoryId: 1, name: `Widget` },
]

const products = createCollection(
queryCollectionOptions<Product>({
id: `query-collection-products`,
queryClient,
queryKey: [`products`],
queryFn: vi
.fn()
.mockImplementation(() => Promise.resolve(productsData)),
getKey: (product) => product.id,
startSync: true,
onUpdate: async ({ transaction }) => {
for (const mutation of transaction.mutations) {
productsData = productsData.map((product) =>
product.id === mutation.key ? mutation.modified : product,
)
}
},
}),
)

const collection = createLiveQueryCollection((q) =>
q.from({ lineItem: lineItems }).select(({ lineItem }) => ({
id: lineItem.id,
product: q
.from({ product: products })
.where(({ product }) => eq(product.id, lineItem.productId))
.select(({ product }) => ({
id: product.id,
categoryId: product.categoryId,
name: product.name,
})),
})),
)

await collection.preload()

expect(() => {
products.update(`product-1`, (draft) => {
draft.categoryId = 2
})
}).not.toThrow()

await vi.waitFor(() => {
expect(
stripVirtualProps((collection.get(`line-1`) as any).product.toArray[0]),
).toEqual({
id: `product-1`,
categoryId: 2,
name: `Widget`,
})
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.
})

it(`should update collection when query data changes`, async () => {
const queryKey = [`testItems`]
const initialItems: Array<TestItem> = [
Expand Down
Loading