Skip to content

Commit 6889598

Browse files
committed
fix(postgres): ensure NULL values use consistent decoded types for SPI plan caching
When syncing NULL values first, PostgreSQL's SPI caches the prepared plan with the NULL's type. If a subsequent non-NULL value decodes to a different type, the plan fails. The fix maps column types to their decoded equivalents so NULL and non-NULL values always use consistent types (e.g., all integers use INT8OID, all floats use FLOAT8OID, most others use TEXTOID). Add map_column_oid_to_decoded_oid() to map column types to their decoded equivalents (INT2/4/8 → INT8, FLOAT4/8/NUMERIC → FLOAT8, BYTEA → BYTEA, others → TEXT). This ensures NULL and non-NULL values bind with the same type, preventing "there is no parameter $N" errors when NULL is synced before non-NULL values for the same column. Add tests 23 and 24 for UUID columns and comprehensive nullable type coverage (INT2/4/8, FLOAT4/8, NUMERIC, BYTEA, TEXT, VARCHAR, CHAR, UUID, JSON, JSONB, DATE, TIMESTAMP).
1 parent 2c5a107 commit 6889598

File tree

6 files changed

+947
-5
lines changed

6 files changed

+947
-5
lines changed

src/postgresql/cloudsync_postgresql.c

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,6 +1638,77 @@ static int cloudsync_decode_value_cb (void *xdata, int index, int type, int64_t
16381638
return DBRES_OK;
16391639
}
16401640

1641+
// Map a column Oid to the decoded type Oid that would be used for non-NULL values.
1642+
// This ensures NULL and non-NULL values use consistent types for SPI plan caching.
1643+
// The mapping must match pgvalue_dbtype() in pgvalue.c which determines encode/decode types.
1644+
// For example, INT4OID columns decode to INT8OID, UUIDOID columns decode to TEXTOID.
1645+
static Oid map_column_oid_to_decoded_oid(Oid col_oid) {
1646+
switch (col_oid) {
1647+
// Integer types → INT8OID (all integers decode to int64)
1648+
// Must match DBTYPE_INTEGER cases in pgvalue_dbtype()
1649+
case INT2OID:
1650+
case INT4OID:
1651+
case INT8OID:
1652+
case BOOLOID: // BOOLEAN encodes/decodes as INTEGER
1653+
case CHAROID: // "char" encodes/decodes as INTEGER
1654+
case OIDOID: // OID encodes/decodes as INTEGER
1655+
return INT8OID;
1656+
// Float types → FLOAT8OID (all floats decode to double)
1657+
// Must match DBTYPE_FLOAT cases in pgvalue_dbtype()
1658+
case FLOAT4OID:
1659+
case FLOAT8OID:
1660+
case NUMERICOID:
1661+
return FLOAT8OID;
1662+
// Binary types → BYTEAOID
1663+
// Must match DBTYPE_BLOB cases in pgvalue_dbtype()
1664+
case BYTEAOID:
1665+
return BYTEAOID;
1666+
// All other types (text, varchar, uuid, json, date, timestamp, etc.) → TEXTOID
1667+
// These all encode/decode as DBTYPE_TEXT
1668+
default:
1669+
return TEXTOID;
1670+
}
1671+
}
1672+
1673+
// Get the Oid of a column from the system catalog.
1674+
// Requires SPI to be connected. Returns InvalidOid if not found.
1675+
static Oid get_column_oid(const char *schema, const char *table_name, const char *column_name) {
1676+
if (!table_name || !column_name) return InvalidOid;
1677+
1678+
const char *query =
1679+
"SELECT a.atttypid "
1680+
"FROM pg_attribute a "
1681+
"JOIN pg_class c ON c.oid = a.attrelid "
1682+
"LEFT JOIN pg_namespace n ON n.oid = c.relnamespace "
1683+
"WHERE c.relname = $1 "
1684+
"AND a.attname = $2 "
1685+
"AND a.attnum > 0 "
1686+
"AND NOT a.attisdropped "
1687+
"AND (n.nspname = $3 OR $3 IS NULL)";
1688+
1689+
Oid argtypes[3] = {TEXTOID, TEXTOID, TEXTOID};
1690+
Datum values[3];
1691+
char nulls[3] = {' ', ' ', schema ? ' ' : 'n'};
1692+
1693+
values[0] = CStringGetTextDatum(table_name);
1694+
values[1] = CStringGetTextDatum(column_name);
1695+
values[2] = schema ? CStringGetTextDatum(schema) : (Datum)0;
1696+
1697+
int ret = SPI_execute_with_args(query, 3, argtypes, values, nulls, true, 1);
1698+
1699+
pfree(DatumGetPointer(values[0]));
1700+
pfree(DatumGetPointer(values[1]));
1701+
if (schema) pfree(DatumGetPointer(values[2]));
1702+
1703+
if (ret != SPI_OK_SELECT || SPI_processed == 0) return InvalidOid;
1704+
1705+
bool isnull;
1706+
Datum col_oid = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);
1707+
if (isnull) return InvalidOid;
1708+
1709+
return DatumGetObjectId(col_oid);
1710+
}
1711+
16411712
// Decode encoded bytea into a pgvalue_t with the decoded base type.
16421713
// Type casting to the target column type is handled by the SQL statement.
16431714
static pgvalue_t *cloudsync_decode_bytea_to_pgvalue (bytea *encoded, bool *out_isnull) {
@@ -2247,9 +2318,23 @@ Datum cloudsync_changes_insert_trigger (PG_FUNCTION_ARGS) {
22472318
if (SPI_connect() != SPI_OK_CONNECT) ereport(ERROR, (errmsg("cloudsync: SPI_connect failed in trigger")));
22482319
spi_connected = true;
22492320

2250-
// Decode value to base type; SQL statement handles type casting via $n::typename
2321+
// Decode value to base type; SQL statement handles type casting via $n::typename.
2322+
// For non-NULL values, we get the decoded base type (INT8OID for integers, TEXTOID for text/UUID, etc).
2323+
// For NULL values, we must use the SAME decoded type that non-NULL values would use.
2324+
// This ensures type consistency across all calls, as SPI caches parameter types on first prepare.
22512325
if (!is_tombstone) {
2252-
col_value = cloudsync_decode_bytea_to_pgvalue(insert_value_encoded, NULL);
2326+
bool value_is_null = false;
2327+
col_value = cloudsync_decode_bytea_to_pgvalue(insert_value_encoded, &value_is_null);
2328+
2329+
// When value is NULL, create a typed NULL pgvalue with the decoded type.
2330+
// We map the column's actual Oid to the corresponding decoded Oid (e.g., INT4OID → INT8OID).
2331+
if (!col_value && value_is_null) {
2332+
Oid col_oid = get_column_oid(table_schema(table), insert_tbl, insert_name);
2333+
if (OidIsValid(col_oid)) {
2334+
Oid decoded_oid = map_column_oid_to_decoded_oid(col_oid);
2335+
col_value = pgvalue_create((Datum)0, decoded_oid, -1, InvalidOid, true);
2336+
}
2337+
}
22532338
}
22542339

22552340
int rc = DBRES_OK;

src/postgresql/database_postgresql.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2140,7 +2140,7 @@ int databasevm_bind_null (dbvm_t *vm, int index) {
21402140

21412141
pg_stmt_t *stmt = (pg_stmt_t*)vm;
21422142
stmt->values[idx] = (Datum)0;
2143-
stmt->types[idx] = BYTEAOID;
2143+
stmt->types[idx] = TEXTOID; // TEXTOID has casts to most types
21442144
stmt->nulls[idx] = 'n';
21452145

21462146
if (stmt->nparams < idx + 1) stmt->nparams = idx + 1;
@@ -2185,7 +2185,8 @@ int databasevm_bind_value (dbvm_t *vm, int index, dbvalue_t *value) {
21852185
pgvalue_t *v = (pgvalue_t *)value;
21862186
if (!v || v->isnull) {
21872187
stmt->values[idx] = (Datum)0;
2188-
stmt->types[idx] = TEXTOID;
2188+
// Use the actual column type if available, otherwise default to TEXTOID
2189+
stmt->types[idx] = (v && OidIsValid(v->typeid)) ? v->typeid : TEXTOID;
21892190
stmt->nulls[idx] = 'n';
21902191
} else {
21912192
int16 typlen;

test/postgresql/01_unittest.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ SELECT cloudsync_version() AS version \gset
2121

2222
-- Test uuid generation
2323
SELECT cloudsync_uuid() AS uuid1 \gset
24-
SELECT pg_sleep(0.1);
24+
SELECT pg_sleep(0.1) \gset
2525
SELECT cloudsync_uuid() AS uuid2 \gset
2626

2727
-- Test 1: Format check (UUID v7 has standard format: xxxxxxxx-xxxx-7xxx-xxxx-xxxxxxxxxxxx)

0 commit comments

Comments
 (0)