Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions packages/zero-client/src/client/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import type {TableSchema} from '../../../zero-schema/src/table-schema.ts';
import type {Schema} from '../../../zero-types/src/schema.ts';
import type {
CRUDExecutor,
CRUDExecutorOptions,
DeleteID,
InsertValue,
TableMutator,
UpdateValue,
UpsertOptions,
UpsertValue,
} from '../../../zql/src/mutate/crud.ts';
import * as crudImpl from './crud-impl.ts';
Expand Down Expand Up @@ -107,12 +109,13 @@ function makeEntityCRUDMutate<S extends TableSchema>(
};
return zeroCRUD({ops: [op]});
},
upsert: (value: UpsertValue<S>) => {
upsert: (value: UpsertValue<S>, options?: UpsertOptions<S>) => {
const op: UpsertOp = {
op: 'upsert',
tableName,
primaryKey,
value,
conflictColumns: options?.onConflict,
};
return zeroCRUD({ops: [op]});
},
Expand Down Expand Up @@ -158,12 +161,13 @@ export function makeBatchCRUDMutate<S extends TableSchema>(
ops.push(op);
return promiseVoid;
},
upsert: (value: UpsertValue<S>) => {
upsert: (value: UpsertValue<S>, options?: UpsertOptions<S>) => {
const op: UpsertOp = {
op: 'upsert',
tableName,
primaryKey,
value,
conflictColumns: options?.onConflict,
};
ops.push(op);
return promiseVoid;
Expand Down Expand Up @@ -207,12 +211,12 @@ export function makeCRUDExecutor(
schema: Schema,
ivmBranch: IVMSourceBranch | undefined,
): CRUDExecutor {
return (tableName, kind, value) => {
return (tableName, kind, value, options?: CRUDExecutorOptions) => {
const {primaryKey} = schema.tables[tableName];
return crudImpl[kind](
tx,
// oxlint-disable-next-line @typescript-eslint/no-explicit-any
{op: kind, tableName, primaryKey, value} as any,
{op: kind, tableName, primaryKey, value, conflictColumns: options?.onConflict} as any,
schema,
ivmBranch,
);
Expand All @@ -230,7 +234,11 @@ export function makeCRUDMutator(schema: Schema): CRUDMutator {
): Promise<void> => {
const executor = makeCRUDExecutor(tx, schema, undefined);
for (const op of crudArg.ops) {
await executor(op.tableName, op.op, op.value);
const options: CRUDExecutorOptions | undefined =
op.op === 'upsert' && op.conflictColumns
? {onConflict: op.conflictColumns}
: undefined;
await executor(op.tableName, op.op, op.value, options);
}
};
}
5 changes: 3 additions & 2 deletions packages/zero-client/src/client/mutate-request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
InsertValue,
SchemaCRUD,
UpdateValue,
UpsertOptions,
UpsertValue,
} from '../../../zql/src/mutate/crud.ts';
import type {Transaction} from '../../../zql/src/mutate/custom.ts';
Expand Down Expand Up @@ -127,7 +128,7 @@ describe('zero.mutate(mr) with MutateRequest', () => {
(value: {id: string; readonly name?: string | undefined}) => Promise<void>
>();
expectTypeOf(z.mutate.user.upsert).toEqualTypeOf<
(value: {id: string; readonly name: string}) => Promise<void>
(value: {id: string; readonly name: string}, options?: UpsertOptions<typeof schema.tables.user>) => Promise<void>
>();
expectTypeOf(z.mutate.user.delete).toEqualTypeOf<
(id: {id: string}) => Promise<void>
Expand Down Expand Up @@ -420,7 +421,7 @@ describe('CRUD patterns on client', () => {
(value: UpdateValue<UserTableSchema>) => Promise<void>
>();
expectTypeOf<UserMutators['upsert']>().toEqualTypeOf<
(value: UpsertValue<UserTableSchema>) => Promise<void>
(value: UpsertValue<UserTableSchema>, options?: UpsertOptions<UserTableSchema>) => Promise<void>
>();
expectTypeOf<UserMutators['delete']>().toEqualTypeOf<
(id: DeleteID<UserTableSchema>) => Promise<void>
Expand Down
5 changes: 5 additions & 0 deletions packages/zero-protocol/src/push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ const insertOpSchema = v.object({
/**
* Upsert semantics. Inserts if entity with id does not already exist,
* otherwise updates existing entity with id.
*
* By default, conflict detection uses the primary key. If `conflictColumns`
* is provided, those columns are used instead (for unique constraints).
*/
const upsertOpSchema = v.object({
op: v.literal('upsert'),
tableName: v.string(),
primaryKey: primaryKeySchema,
value: rowSchema,
// Optional: columns to use for ON CONFLICT instead of primary key
conflictColumns: v.array(v.string()).optional(),
});

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/zero-server/src/custom.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
DeleteID,
InsertValue,
UpdateValue,
UpsertOptions,
UpsertValue,
} from '../../zql/src/mutate/crud.ts';
import type {DBTransaction} from '../../zql/src/mutate/custom.ts';
Expand Down Expand Up @@ -163,7 +164,7 @@ describe('server CRUD patterns', () => {
(value: UpdateValue<BasicTable>) => Promise<void>
>();
expectTypeOf<BasicCRUD['upsert']>().toEqualTypeOf<
(value: UpsertValue<BasicTable>) => Promise<void>
(value: UpsertValue<BasicTable>, options?: UpsertOptions<BasicTable>) => Promise<void>
>();
expectTypeOf<BasicCRUD['delete']>().toEqualTypeOf<
(id: DeleteID<BasicTable>) => Promise<void>
Expand Down
21 changes: 15 additions & 6 deletions packages/zero-server/src/custom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import type {
} from '../../zero-types/src/server-schema.ts';
import {
type CRUDExecutor,
type CRUDExecutorOptions,
type CRUDKind,
makeCRUDMutate,
makeTransactionMutate,
type SchemaCRUD,
type TableCRUD,
type TransactionMutate,
type UpsertOptions,
} from '../../zql/src/mutate/crud.ts';
import type {
DBTransaction,
Expand Down Expand Up @@ -222,8 +224,11 @@ export class CRUDMutatorFactory<S extends Schema> {
mapValues(tableCRUD, method => method.bind(txHolder)),
) as unknown as SchemaCRUD<S>;

return (table: string, kind: CRUDKind, args: unknown) => {
return (table: string, kind: CRUDKind, args: unknown, options?: CRUDExecutorOptions) => {
const tableCRUD = boundCRUDs[table as keyof S['tables']];
if (kind === 'upsert' && options?.onConflict) {
return tableCRUD.upsert(args as never, {onConflict: options.onConflict});
}
// oxlint-disable-next-line @typescript-eslint/no-explicit-any
return (tableCRUD as any)[kind](args);
};
Expand Down Expand Up @@ -334,8 +339,11 @@ function makeServerCRUDExecutor<S extends Schema>(
) as unknown as SchemaCRUD<S>;

// Return executor that dispatches to bound methods
return (table: string, kind: CRUDKind, args: unknown) => {
return (table: string, kind: CRUDKind, args: unknown, options?: CRUDExecutorOptions) => {
const tableCRUD = boundCRUDs[table as keyof S['tables']];
if (kind === 'upsert' && options?.onConflict) {
return tableCRUD.upsert(args as never, {onConflict: options.onConflict});
}
// oxlint-disable-next-line @typescript-eslint/no-explicit-any
return (tableCRUD as any)[kind](args);
};
Expand Down Expand Up @@ -366,12 +374,13 @@ function makeServerTableCRUD(schema: TableSchema): TableCRUD<TableSchema> {
const tx = this[dbTxSymbol];
await tx.query(stmt.text, stmt.values);
},
async upsert(this: WithHiddenTxAndSchema, value) {
async upsert(this: WithHiddenTxAndSchema, value, options?: UpsertOptions<TableSchema>) {
value = removeUndefined(value);
const serverTableSchema = this[serverSchemaSymbol][serverName(schema)];
const targetedColumns = origAndServerNamesFor(Object.keys(value), schema);
const primaryKeyColumns = origAndServerNamesFor(
schema.primaryKey,
// use custom conflict columns if provided, otherwise fall back to primary key
const conflictColumns = origAndServerNamesFor(
options?.onConflict ?? schema.primaryKey,
schema,
);
const stmt = formatPgInternalConvert(
Expand All @@ -384,7 +393,7 @@ function makeServerTableCRUD(schema: TableSchema): TableCRUD<TableSchema> {
),
', ',
)}) ON CONFLICT (${sql.join(
primaryKeyColumns.map(([, serverName]) => sql.ident(serverName)),
conflictColumns.map(([, serverName]) => sql.ident(serverName)),
', ',
)}) DO UPDATE SET ${sql.join(
Object.entries(value).map(
Expand Down
57 changes: 43 additions & 14 deletions packages/zql/src/mutate/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ export type SchemaCRUD<S extends Schema> = {

export type TransactionMutate<S extends Schema> = SchemaCRUD<S>;

/**
* Options for upsert operations.
*/
export type UpsertOptions<S extends TableSchema> = {
/**
* Columns to use for conflict detection instead of the primary key.
* Use this when upserting based on a unique constraint other than the primary key.
*/
onConflict?: (keyof S['columns'] & string)[];
};

export type TableCRUD<S extends TableSchema> = {
/**
* Writes a row if a row with the same primary key doesn't already exist.
Expand All @@ -21,11 +32,15 @@ export type TableCRUD<S extends TableSchema> = {

/**
* Writes a row unconditionally, overwriting any existing row with the same
* primary key. Non-primary-key fields that are 'optional' can be omitted or
* set to `undefined`. Such fields will be assigned the value `null`
* optimistically and then the default value as defined by the server.
* primary key (or custom conflict columns if specified). Non-primary-key
* fields that are 'optional' can be omitted or set to `undefined`. Such
* fields will be assigned the value `null` optimistically and then the
* default value as defined by the server.
*
* @param value - The row data to upsert
* @param options - Optional settings including custom conflict columns
*/
upsert: (value: UpsertValue<S>) => Promise<void>;
upsert: (value: UpsertValue<S>, options?: UpsertOptions<S>) => Promise<void>;

/**
* Updates a row with the same primary key. If no such row exists, this
Expand Down Expand Up @@ -90,11 +105,15 @@ export type TableMutator<TS extends TableSchema> = {

/**
* Writes a row unconditionally, overwriting any existing row with the same
* primary key. Non-primary-key fields that are 'optional' can be omitted or
* set to `undefined`. Such fields will be assigned the value `null`
* optimistically and then the default value as defined by the server.
* primary key (or custom conflict columns if specified). Non-primary-key
* fields that are 'optional' can be omitted or set to `undefined`. Such
* fields will be assigned the value `null` optimistically and then the
* default value as defined by the server.
*
* @param value - The row data to upsert
* @param options - Optional settings including custom conflict columns
*/
upsert: (value: UpsertValue<TS>) => Promise<void>;
upsert: (value: UpsertValue<TS>, options?: UpsertOptions<TS>) => Promise<void>;

/**
* Updates a row with the same primary key. If no such row exists, this
Expand All @@ -110,6 +129,14 @@ export type TableMutator<TS extends TableSchema> = {
delete: (id: DeleteID<TS>) => Promise<void>;
};

/**
* Options passed to the executor for CRUD operations.
*/
export type CRUDExecutorOptions = {
/** For upsert: columns to use for ON CONFLICT instead of primary key */
onConflict?: string[];
};

/**
* A function that executes a CRUD operation.
* Client and server provide different implementations.
Expand All @@ -118,6 +145,7 @@ export type CRUDExecutor = (
table: string,
kind: CRUDKind,
args: unknown,
options?: CRUDExecutorOptions,
) => Promise<void>;

/**
Expand Down Expand Up @@ -180,12 +208,13 @@ function makeTableCRUD(
tableName: string,
executor: CRUDExecutor,
): TableCRUD<TableSchema> {
return Object.fromEntries(
CRUD_KINDS.map(kind => [
kind,
(value: unknown) => executor(tableName, kind, value),
]),
) as TableCRUD<TableSchema>;
return {
insert: (value: unknown) => executor(tableName, 'insert', value),
upsert: (value: unknown, options?: UpsertOptions<TableSchema>) =>
executor(tableName, 'upsert', value, options?.onConflict ? {onConflict: options.onConflict} : undefined),
update: (value: unknown) => executor(tableName, 'update', value),
delete: (value: unknown) => executor(tableName, 'delete', value),
};
}

export type CRUDMutator<
Expand Down