diff --git a/packages/zero-client/src/client/crud.ts b/packages/zero-client/src/client/crud.ts index 224e64a29d..f9fed81082 100644 --- a/packages/zero-client/src/client/crud.ts +++ b/packages/zero-client/src/client/crud.ts @@ -13,10 +13,12 @@ import type {TableSchema} from '../../../zero-schema/src/table-schema.ts'; import type {Schema} from '../../../zero-types/src/schema.ts'; import type { CRUDExecutor, + CRUDExecutorOptions, DeleteID, InsertValue, TableMutator, UpdateValue, + UpsertOptions, UpsertValue, } from '../../../zql/src/mutate/crud.ts'; import * as crudImpl from './crud-impl.ts'; @@ -107,12 +109,13 @@ function makeEntityCRUDMutate( }; return zeroCRUD({ops: [op]}); }, - upsert: (value: UpsertValue) => { + upsert: (value: UpsertValue, options?: UpsertOptions) => { const op: UpsertOp = { op: 'upsert', tableName, primaryKey, value, + conflictColumns: options?.onConflict, }; return zeroCRUD({ops: [op]}); }, @@ -158,12 +161,13 @@ export function makeBatchCRUDMutate( ops.push(op); return promiseVoid; }, - upsert: (value: UpsertValue) => { + upsert: (value: UpsertValue, options?: UpsertOptions) => { const op: UpsertOp = { op: 'upsert', tableName, primaryKey, value, + conflictColumns: options?.onConflict, }; ops.push(op); return promiseVoid; @@ -207,12 +211,12 @@ export function makeCRUDExecutor( schema: Schema, ivmBranch: IVMSourceBranch | undefined, ): CRUDExecutor { - return (tableName, kind, value) => { + return (tableName, kind, value, options?: CRUDExecutorOptions) => { const {primaryKey} = schema.tables[tableName]; return crudImpl[kind]( tx, // oxlint-disable-next-line @typescript-eslint/no-explicit-any - {op: kind, tableName, primaryKey, value} as any, + {op: kind, tableName, primaryKey, value, conflictColumns: options?.onConflict} as any, schema, ivmBranch, ); @@ -230,7 +234,11 @@ export function makeCRUDMutator(schema: Schema): CRUDMutator { ): Promise => { const executor = makeCRUDExecutor(tx, schema, undefined); for (const op of crudArg.ops) { - await executor(op.tableName, op.op, op.value); + const options: CRUDExecutorOptions | undefined = + op.op === 'upsert' && op.conflictColumns + ? {onConflict: op.conflictColumns} + : undefined; + await executor(op.tableName, op.op, op.value, options); } }; } diff --git a/packages/zero-client/src/client/mutate-request.test.ts b/packages/zero-client/src/client/mutate-request.test.ts index 6e23e89ced..3b88d1aa4d 100644 --- a/packages/zero-client/src/client/mutate-request.test.ts +++ b/packages/zero-client/src/client/mutate-request.test.ts @@ -6,6 +6,7 @@ import type { InsertValue, SchemaCRUD, UpdateValue, + UpsertOptions, UpsertValue, } from '../../../zql/src/mutate/crud.ts'; import type {Transaction} from '../../../zql/src/mutate/custom.ts'; @@ -127,7 +128,7 @@ describe('zero.mutate(mr) with MutateRequest', () => { (value: {id: string; readonly name?: string | undefined}) => Promise >(); expectTypeOf(z.mutate.user.upsert).toEqualTypeOf< - (value: {id: string; readonly name: string}) => Promise + (value: {id: string; readonly name: string}, options?: UpsertOptions) => Promise >(); expectTypeOf(z.mutate.user.delete).toEqualTypeOf< (id: {id: string}) => Promise @@ -420,7 +421,7 @@ describe('CRUD patterns on client', () => { (value: UpdateValue) => Promise >(); expectTypeOf().toEqualTypeOf< - (value: UpsertValue) => Promise + (value: UpsertValue, options?: UpsertOptions) => Promise >(); expectTypeOf().toEqualTypeOf< (id: DeleteID) => Promise diff --git a/packages/zero-protocol/src/push.ts b/packages/zero-protocol/src/push.ts index 1a8419f37e..d21a0c0182 100644 --- a/packages/zero-protocol/src/push.ts +++ b/packages/zero-protocol/src/push.ts @@ -52,12 +52,17 @@ const insertOpSchema = v.object({ /** * Upsert semantics. Inserts if entity with id does not already exist, * otherwise updates existing entity with id. + * + * By default, conflict detection uses the primary key. If `conflictColumns` + * is provided, those columns are used instead (for unique constraints). */ const upsertOpSchema = v.object({ op: v.literal('upsert'), tableName: v.string(), primaryKey: primaryKeySchema, value: rowSchema, + // Optional: columns to use for ON CONFLICT instead of primary key + conflictColumns: v.array(v.string()).optional(), }); /** diff --git a/packages/zero-server/src/custom.test.ts b/packages/zero-server/src/custom.test.ts index b84811d340..2bef4f03b8 100644 --- a/packages/zero-server/src/custom.test.ts +++ b/packages/zero-server/src/custom.test.ts @@ -12,6 +12,7 @@ import type { DeleteID, InsertValue, UpdateValue, + UpsertOptions, UpsertValue, } from '../../zql/src/mutate/crud.ts'; import type {DBTransaction} from '../../zql/src/mutate/custom.ts'; @@ -163,7 +164,7 @@ describe('server CRUD patterns', () => { (value: UpdateValue) => Promise >(); expectTypeOf().toEqualTypeOf< - (value: UpsertValue) => Promise + (value: UpsertValue, options?: UpsertOptions) => Promise >(); expectTypeOf().toEqualTypeOf< (id: DeleteID) => Promise diff --git a/packages/zero-server/src/custom.ts b/packages/zero-server/src/custom.ts index 91e2e18600..31d71b00c1 100644 --- a/packages/zero-server/src/custom.ts +++ b/packages/zero-server/src/custom.ts @@ -15,12 +15,14 @@ import type { } from '../../zero-types/src/server-schema.ts'; import { type CRUDExecutor, + type CRUDExecutorOptions, type CRUDKind, makeCRUDMutate, makeTransactionMutate, type SchemaCRUD, type TableCRUD, type TransactionMutate, + type UpsertOptions, } from '../../zql/src/mutate/crud.ts'; import type { DBTransaction, @@ -222,8 +224,11 @@ export class CRUDMutatorFactory { mapValues(tableCRUD, method => method.bind(txHolder)), ) as unknown as SchemaCRUD; - return (table: string, kind: CRUDKind, args: unknown) => { + return (table: string, kind: CRUDKind, args: unknown, options?: CRUDExecutorOptions) => { const tableCRUD = boundCRUDs[table as keyof S['tables']]; + if (kind === 'upsert' && options?.onConflict) { + return tableCRUD.upsert(args as never, {onConflict: options.onConflict}); + } // oxlint-disable-next-line @typescript-eslint/no-explicit-any return (tableCRUD as any)[kind](args); }; @@ -334,8 +339,11 @@ function makeServerCRUDExecutor( ) as unknown as SchemaCRUD; // Return executor that dispatches to bound methods - return (table: string, kind: CRUDKind, args: unknown) => { + return (table: string, kind: CRUDKind, args: unknown, options?: CRUDExecutorOptions) => { const tableCRUD = boundCRUDs[table as keyof S['tables']]; + if (kind === 'upsert' && options?.onConflict) { + return tableCRUD.upsert(args as never, {onConflict: options.onConflict}); + } // oxlint-disable-next-line @typescript-eslint/no-explicit-any return (tableCRUD as any)[kind](args); }; @@ -366,12 +374,13 @@ function makeServerTableCRUD(schema: TableSchema): TableCRUD { const tx = this[dbTxSymbol]; await tx.query(stmt.text, stmt.values); }, - async upsert(this: WithHiddenTxAndSchema, value) { + async upsert(this: WithHiddenTxAndSchema, value, options?: UpsertOptions) { value = removeUndefined(value); const serverTableSchema = this[serverSchemaSymbol][serverName(schema)]; const targetedColumns = origAndServerNamesFor(Object.keys(value), schema); - const primaryKeyColumns = origAndServerNamesFor( - schema.primaryKey, + // use custom conflict columns if provided, otherwise fall back to primary key + const conflictColumns = origAndServerNamesFor( + options?.onConflict ?? schema.primaryKey, schema, ); const stmt = formatPgInternalConvert( @@ -384,7 +393,7 @@ function makeServerTableCRUD(schema: TableSchema): TableCRUD { ), ', ', )}) ON CONFLICT (${sql.join( - primaryKeyColumns.map(([, serverName]) => sql.ident(serverName)), + conflictColumns.map(([, serverName]) => sql.ident(serverName)), ', ', )}) DO UPDATE SET ${sql.join( Object.entries(value).map( diff --git a/packages/zql/src/mutate/crud.ts b/packages/zql/src/mutate/crud.ts index 4ff4ada8d9..630154e3c1 100644 --- a/packages/zql/src/mutate/crud.ts +++ b/packages/zql/src/mutate/crud.ts @@ -10,6 +10,17 @@ export type SchemaCRUD = { export type TransactionMutate = SchemaCRUD; +/** + * Options for upsert operations. + */ +export type UpsertOptions = { + /** + * Columns to use for conflict detection instead of the primary key. + * Use this when upserting based on a unique constraint other than the primary key. + */ + onConflict?: (keyof S['columns'] & string)[]; +}; + export type TableCRUD = { /** * Writes a row if a row with the same primary key doesn't already exist. @@ -21,11 +32,15 @@ export type TableCRUD = { /** * Writes a row unconditionally, overwriting any existing row with the same - * primary key. Non-primary-key fields that are 'optional' can be omitted or - * set to `undefined`. Such fields will be assigned the value `null` - * optimistically and then the default value as defined by the server. + * primary key (or custom conflict columns if specified). Non-primary-key + * fields that are 'optional' can be omitted or set to `undefined`. Such + * fields will be assigned the value `null` optimistically and then the + * default value as defined by the server. + * + * @param value - The row data to upsert + * @param options - Optional settings including custom conflict columns */ - upsert: (value: UpsertValue) => Promise; + upsert: (value: UpsertValue, options?: UpsertOptions) => Promise; /** * Updates a row with the same primary key. If no such row exists, this @@ -90,11 +105,15 @@ export type TableMutator = { /** * Writes a row unconditionally, overwriting any existing row with the same - * primary key. Non-primary-key fields that are 'optional' can be omitted or - * set to `undefined`. Such fields will be assigned the value `null` - * optimistically and then the default value as defined by the server. + * primary key (or custom conflict columns if specified). Non-primary-key + * fields that are 'optional' can be omitted or set to `undefined`. Such + * fields will be assigned the value `null` optimistically and then the + * default value as defined by the server. + * + * @param value - The row data to upsert + * @param options - Optional settings including custom conflict columns */ - upsert: (value: UpsertValue) => Promise; + upsert: (value: UpsertValue, options?: UpsertOptions) => Promise; /** * Updates a row with the same primary key. If no such row exists, this @@ -110,6 +129,14 @@ export type TableMutator = { delete: (id: DeleteID) => Promise; }; +/** + * Options passed to the executor for CRUD operations. + */ +export type CRUDExecutorOptions = { + /** For upsert: columns to use for ON CONFLICT instead of primary key */ + onConflict?: string[]; +}; + /** * A function that executes a CRUD operation. * Client and server provide different implementations. @@ -118,6 +145,7 @@ export type CRUDExecutor = ( table: string, kind: CRUDKind, args: unknown, + options?: CRUDExecutorOptions, ) => Promise; /** @@ -180,12 +208,13 @@ function makeTableCRUD( tableName: string, executor: CRUDExecutor, ): TableCRUD { - return Object.fromEntries( - CRUD_KINDS.map(kind => [ - kind, - (value: unknown) => executor(tableName, kind, value), - ]), - ) as TableCRUD; + return { + insert: (value: unknown) => executor(tableName, 'insert', value), + upsert: (value: unknown, options?: UpsertOptions) => + executor(tableName, 'upsert', value, options?.onConflict ? {onConflict: options.onConflict} : undefined), + update: (value: unknown) => executor(tableName, 'update', value), + delete: (value: unknown) => executor(tableName, 'delete', value), + }; } export type CRUDMutator<