Skip to content

Commit edfbc14

Browse files
committed
Merge branch 'dev' into sync-filter
2 parents 15933cc + 47180e7 commit edfbc14

18 files changed

+2822
-81
lines changed

.claude/commands/test-sync-roundtrip-rls.md

Lines changed: 532 additions & 0 deletions
Large diffs are not rendered by default.

.github/workflows/main.yml

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,41 @@ jobs:
226226
path: dist/${{ matrix.name == 'apple-xcframework' && 'CloudSync.*' || 'cloudsync.*'}}
227227
if-no-files-found: error
228228

229+
postgres-test:
230+
runs-on: ubuntu-22.04
231+
name: postgresql build + test
232+
timeout-minutes: 10
233+
234+
steps:
235+
236+
- uses: actions/checkout@v4.2.2
237+
238+
- name: build and start postgresql container
239+
run: make postgres-docker-rebuild
240+
241+
- name: wait for postgresql to be ready
242+
run: |
243+
for i in $(seq 1 30); do
244+
if docker exec cloudsync-postgres pg_isready -U postgres > /dev/null 2>&1; then
245+
echo "PostgreSQL is ready"
246+
exit 0
247+
fi
248+
sleep 2
249+
done
250+
echo "PostgreSQL failed to start within 60s"
251+
docker logs cloudsync-postgres
252+
exit 1
253+
254+
- name: run postgresql tests
255+
run: |
256+
docker exec cloudsync-postgres mkdir -p /tmp/cloudsync/test
257+
docker cp test/postgresql cloudsync-postgres:/tmp/cloudsync/test/postgresql
258+
docker exec cloudsync-postgres psql -U postgres -d postgres -f /tmp/cloudsync/test/postgresql/full_test.sql
259+
229260
release:
230261
runs-on: ubuntu-22.04
231262
name: release
232-
needs: build
263+
needs: [build, postgres-test]
233264
if: github.ref == 'refs/heads/main'
234265

235266
env:

docker/Makefile.postgresql

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -137,32 +137,32 @@ PG_DOCKER_DB_PASSWORD ?= postgres
137137

138138
# Build Docker image with pre-installed extension
139139
postgres-docker-build:
140-
@echo "Building Docker image via docker-compose (rebuilt when sources change)..."
140+
@echo "Building Docker image via docker compose (rebuilt when sources change)..."
141141
# To force plaintext BuildKit logs, run: make postgres-docker-build DOCKER_BUILD_ARGS="--progress=plain"
142-
cd docker/postgresql && docker-compose build $(DOCKER_BUILD_ARGS)
142+
cd docker/postgresql && docker compose build $(DOCKER_BUILD_ARGS)
143143
@echo ""
144144
@echo "Docker image built successfully!"
145145

146146
# Build Docker image with AddressSanitizer enabled (override compose file)
147147
postgres-docker-build-asan:
148-
@echo "Building Docker image with ASAN via docker-compose..."
148+
@echo "Building Docker image with ASAN via docker compose..."
149149
# To force plaintext BuildKit logs, run: make postgres-docker-build-asan DOCKER_BUILD_ARGS=\"--progress=plain\"
150-
cd docker/postgresql && docker-compose -f docker-compose.debug.yml -f docker-compose.asan.yml build $(DOCKER_BUILD_ARGS)
150+
cd docker/postgresql && docker compose -f docker-compose.debug.yml -f docker-compose.asan.yml build $(DOCKER_BUILD_ARGS)
151151
@echo ""
152152
@echo "ASAN Docker image built successfully!"
153153

154154
# Build Docker image using docker-compose.debug.yml
155155
postgres-docker-debug-build:
156-
@echo "Building debug Docker image via docker-compose..."
156+
@echo "Building debug Docker image via docker compose..."
157157
# To force plaintext BuildKit logs, run: make postgres-docker-debug-build DOCKER_BUILD_ARGS=\"--progress=plain\"
158-
cd docker/postgresql && docker-compose -f docker-compose.debug.yml build $(DOCKER_BUILD_ARGS)
158+
cd docker/postgresql && docker compose -f docker-compose.debug.yml build $(DOCKER_BUILD_ARGS)
159159
@echo ""
160160
@echo "Debug Docker image built successfully!"
161161

162162
# Run PostgreSQL container with CloudSync
163163
postgres-docker-run:
164164
@echo "Starting PostgreSQL with CloudSync..."
165-
cd docker/postgresql && docker-compose up -d --build
165+
cd docker/postgresql && docker compose up -d --build
166166
@echo ""
167167
@echo "Container started successfully!"
168168
@echo ""
@@ -179,7 +179,7 @@ postgres-docker-run:
179179
# Run PostgreSQL container with CloudSync and AddressSanitizer enabled
180180
postgres-docker-run-asan:
181181
@echo "Starting PostgreSQL with CloudSync (ASAN enabled)..."
182-
cd docker/postgresql && docker-compose -f docker-compose.debug.yml -f docker-compose.asan.yml up -d --build
182+
cd docker/postgresql && docker compose -f docker-compose.debug.yml -f docker-compose.asan.yml up -d --build
183183
@echo ""
184184
@echo "Container started successfully!"
185185
@echo ""
@@ -196,7 +196,7 @@ postgres-docker-run-asan:
196196
# Run PostgreSQL container using docker-compose.debug.yml
197197
postgres-docker-debug-run:
198198
@echo "Starting PostgreSQL with CloudSync (debug compose)..."
199-
cd docker/postgresql && docker-compose -f docker-compose.debug.yml up -d --build
199+
cd docker/postgresql && docker compose -f docker-compose.debug.yml up -d --build
200200
@echo ""
201201
@echo "Container started successfully!"
202202
@echo ""
@@ -213,21 +213,21 @@ postgres-docker-debug-run:
213213
# Stop PostgreSQL container
214214
postgres-docker-stop:
215215
@echo "Stopping PostgreSQL container..."
216-
cd docker/postgresql && docker-compose down
216+
cd docker/postgresql && docker compose down
217217
@echo "Container stopped"
218218

219219
# Rebuild and restart container
220220
postgres-docker-rebuild: postgres-docker-build
221221
@echo "Rebuilding and restarting container..."
222-
cd docker/postgresql && docker-compose down
223-
cd docker/postgresql && docker-compose up -d --build
222+
cd docker/postgresql && docker compose down
223+
cd docker/postgresql && docker compose up -d --build
224224
@echo "Container restarted with new image"
225225

226226
# Rebuild and restart container using docker-compose.debug.yml
227227
postgres-docker-debug-rebuild: postgres-docker-debug-build
228228
@echo "Rebuilding and restarting debug container..."
229-
cd docker/postgresql && docker-compose -f docker-compose.debug.yml down
230-
cd docker/postgresql && docker-compose -f docker-compose.debug.yml up -d --build
229+
cd docker/postgresql && docker compose -f docker-compose.debug.yml down
230+
cd docker/postgresql && docker compose -f docker-compose.debug.yml up -d --build
231231
@echo "Debug container restarted with new image"
232232

233233
# Interactive shell in container
@@ -353,5 +353,5 @@ postgres-help:
353353
# Simple smoke test: rebuild image/container, create extension, and query version
354354
unittest-pg: postgres-docker-rebuild
355355
@echo "Running PostgreSQL extension smoke test..."
356-
cd docker/postgresql && docker-compose exec -T postgres psql -U postgres -d cloudsync_test -f /tmp/cloudsync/docker/postgresql/smoke_test.sql
356+
cd docker/postgresql && docker compose exec -T postgres psql -U postgres -d cloudsync_test -f /tmp/cloudsync/docker/postgresql/smoke_test.sql
357357
@echo "Smoke test completed."

src/cloudsync.c

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@
4949
#define CLOUDSYNC_INIT_NTABLES 64
5050
#define CLOUDSYNC_MIN_DB_VERSION 0
5151

52-
#define CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK 1
5352
#define CLOUDSYNC_PAYLOAD_MINBUF_SIZE (512*1024)
5453
#define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
5554
#define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
5655
#define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
5756
#define CLOUDSYNC_PAYLOAD_VERSION_2 2
57+
#define CLOUDSYNC_PAYLOAD_VERSION_LATEST CLOUDSYNC_PAYLOAD_VERSION_2
5858
#define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM CLOUDSYNC_PAYLOAD_VERSION_2
5959

6060
#ifndef MAX
@@ -63,10 +63,6 @@
6363

6464
#define DEBUG_DBERROR(_rc, _fn, _data) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
6565

66-
#if CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK
67-
bool schema_hash_disabled = true;
68-
#endif
69-
7066
typedef enum {
7167
CLOUDSYNC_PK_INDEX_TBL = 0,
7268
CLOUDSYNC_PK_INDEX_PK = 1,
@@ -1208,18 +1204,20 @@ int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, c
12081204
return rc;
12091205
}
12101206

1211-
// bind value
1207+
// bind value (always bind all expected parameters for correct prepared statement handling)
12121208
if (col_value) {
12131209
rc = databasevm_bind_value(vm, table->npks+1, col_value);
12141210
if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
1215-
if (rc != DBRES_OK) {
1216-
cloudsync_set_dberror(data);
1217-
dbvm_reset(vm);
1218-
return rc;
1219-
}
1220-
1211+
} else {
1212+
rc = databasevm_bind_null(vm, table->npks+1);
1213+
if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
12211214
}
1222-
1215+
if (rc != DBRES_OK) {
1216+
cloudsync_set_dberror(data);
1217+
dbvm_reset(vm);
1218+
return rc;
1219+
}
1220+
12231221
// perform real operation and disable triggers
12241222

12251223
// in case of GOS we reused the table->col_merge_stmt statement
@@ -2358,15 +2356,17 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
23582356
header.nrows = ntohl(header.nrows);
23592357
header.schema_hash = ntohll(header.schema_hash);
23602358

2361-
#if !CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK
2362-
if (!data || header.schema_hash != data->schema_hash) {
2363-
if (!database_check_schema_hash(data, header.schema_hash)) {
2364-
char buffer[1024];
2365-
snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
2366-
return cloudsync_set_error(data, buffer, DBRES_MISUSE);
2359+
// compare schema_hash only if not disabled and if the received payload was created with the current header version
2360+
// to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
2361+
if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
2362+
if (header.schema_hash != data->schema_hash) {
2363+
if (!database_check_schema_hash(data, header.schema_hash)) {
2364+
char buffer[1024];
2365+
snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
2366+
return cloudsync_set_error(data, buffer, DBRES_MISUSE);
2367+
}
23672368
}
23682369
}
2369-
#endif
23702370

23712371
// sanity check header
23722372
if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
@@ -2539,8 +2539,8 @@ int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size,
25392539

25402540
// retrieve BLOB
25412541
char sql[1024];
2542-
snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes) "
2543-
"SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, NULL)) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL", *db_version, *db_version, *seq);
2542+
snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
2543+
"SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, 0)) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL", *db_version, *db_version, *seq);
25442544

25452545
int64_t len = 0;
25462546
int rc = database_select_blob_2int(data, sql, blob, &len, new_db_version, new_seq);

src/dbutils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#define CLOUDSYNC_KEY_SCHEMA "schema"
2626
#define CLOUDSYNC_KEY_DEBUG "debug"
2727
#define CLOUDSYNC_KEY_ALGO "algo"
28+
#define CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK "skip_schema_hash_check"
2829

2930
// settings
3031
int dbutils_settings_init (cloudsync_context *data);

src/postgresql/cloudsync--1.0.sql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,3 +288,21 @@ CREATE OR REPLACE FUNCTION cloudsync_table_schema(table_name text)
288288
RETURNS text
289289
AS 'MODULE_PATHNAME', 'pg_cloudsync_table_schema'
290290
LANGUAGE C VOLATILE;
291+
292+
-- ============================================================================
293+
-- Type Casts
294+
-- ============================================================================
295+
296+
-- Cast function: converts bigint to boolean (0 = false, non-zero = true)
297+
-- Required because BOOLEAN values are encoded as INT8 in sync payloads,
298+
-- but PostgreSQL has no built-in cast from bigint to boolean.
299+
CREATE FUNCTION cloudsync_int8_to_bool(bigint) RETURNS boolean AS $$
300+
SELECT $1 <> 0
301+
$$ LANGUAGE SQL IMMUTABLE STRICT;
302+
303+
-- ASSIGNMENT cast: auto-applies in INSERT/UPDATE context only
304+
-- This enables BOOLEAN column sync where values are encoded as INT8.
305+
-- Using ASSIGNMENT (not IMPLICIT) to avoid unintended conversions in WHERE clauses.
306+
CREATE CAST (bigint AS boolean)
307+
WITH FUNCTION cloudsync_int8_to_bool(bigint)
308+
AS ASSIGNMENT;

src/postgresql/cloudsync_postgresql.c

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,6 +1757,77 @@ static int cloudsync_decode_value_cb (void *xdata, int index, int type, int64_t
17571757
return DBRES_OK;
17581758
}
17591759

1760+
// Map a column Oid to the decoded type Oid that would be used for non-NULL values.
1761+
// This ensures NULL and non-NULL values use consistent types for SPI plan caching.
1762+
// The mapping must match pgvalue_dbtype() in pgvalue.c which determines encode/decode types.
1763+
// For example, INT4OID columns decode to INT8OID, UUIDOID columns decode to TEXTOID.
1764+
static Oid map_column_oid_to_decoded_oid(Oid col_oid) {
1765+
switch (col_oid) {
1766+
// Integer types → INT8OID (all integers decode to int64)
1767+
// Must match DBTYPE_INTEGER cases in pgvalue_dbtype()
1768+
case INT2OID:
1769+
case INT4OID:
1770+
case INT8OID:
1771+
case BOOLOID: // BOOLEAN encodes/decodes as INTEGER
1772+
case CHAROID: // "char" encodes/decodes as INTEGER
1773+
case OIDOID: // OID encodes/decodes as INTEGER
1774+
return INT8OID;
1775+
// Float types → FLOAT8OID (all floats decode to double)
1776+
// Must match DBTYPE_FLOAT cases in pgvalue_dbtype()
1777+
case FLOAT4OID:
1778+
case FLOAT8OID:
1779+
case NUMERICOID:
1780+
return FLOAT8OID;
1781+
// Binary types → BYTEAOID
1782+
// Must match DBTYPE_BLOB cases in pgvalue_dbtype()
1783+
case BYTEAOID:
1784+
return BYTEAOID;
1785+
// All other types (text, varchar, uuid, json, date, timestamp, etc.) → TEXTOID
1786+
// These all encode/decode as DBTYPE_TEXT
1787+
default:
1788+
return TEXTOID;
1789+
}
1790+
}
1791+
1792+
// Get the Oid of a column from the system catalog.
1793+
// Requires SPI to be connected. Returns InvalidOid if not found.
1794+
static Oid get_column_oid(const char *schema, const char *table_name, const char *column_name) {
1795+
if (!table_name || !column_name) return InvalidOid;
1796+
1797+
const char *query =
1798+
"SELECT a.atttypid "
1799+
"FROM pg_attribute a "
1800+
"JOIN pg_class c ON c.oid = a.attrelid "
1801+
"LEFT JOIN pg_namespace n ON n.oid = c.relnamespace "
1802+
"WHERE c.relname = $1 "
1803+
"AND a.attname = $2 "
1804+
"AND a.attnum > 0 "
1805+
"AND NOT a.attisdropped "
1806+
"AND (n.nspname = $3 OR $3 IS NULL)";
1807+
1808+
Oid argtypes[3] = {TEXTOID, TEXTOID, TEXTOID};
1809+
Datum values[3];
1810+
char nulls[3] = {' ', ' ', schema ? ' ' : 'n'};
1811+
1812+
values[0] = CStringGetTextDatum(table_name);
1813+
values[1] = CStringGetTextDatum(column_name);
1814+
values[2] = schema ? CStringGetTextDatum(schema) : (Datum)0;
1815+
1816+
int ret = SPI_execute_with_args(query, 3, argtypes, values, nulls, true, 1);
1817+
1818+
pfree(DatumGetPointer(values[0]));
1819+
pfree(DatumGetPointer(values[1]));
1820+
if (schema) pfree(DatumGetPointer(values[2]));
1821+
1822+
if (ret != SPI_OK_SELECT || SPI_processed == 0) return InvalidOid;
1823+
1824+
bool isnull;
1825+
Datum col_oid = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);
1826+
if (isnull) return InvalidOid;
1827+
1828+
return DatumGetObjectId(col_oid);
1829+
}
1830+
17601831
// Decode encoded bytea into a pgvalue_t with the decoded base type.
17611832
// Type casting to the target column type is handled by the SQL statement.
17621833
static pgvalue_t *cloudsync_decode_bytea_to_pgvalue (bytea *encoded, bool *out_isnull) {
@@ -2366,9 +2437,23 @@ Datum cloudsync_changes_insert_trigger (PG_FUNCTION_ARGS) {
23662437
if (SPI_connect() != SPI_OK_CONNECT) ereport(ERROR, (errmsg("cloudsync: SPI_connect failed in trigger")));
23672438
spi_connected = true;
23682439

2369-
// Decode value to base type; SQL statement handles type casting via $n::typename
2440+
// Decode value to base type; SQL statement handles type casting via $n::typename.
2441+
// For non-NULL values, we get the decoded base type (INT8OID for integers, TEXTOID for text/UUID, etc).
2442+
// For NULL values, we must use the SAME decoded type that non-NULL values would use.
2443+
// This ensures type consistency across all calls, as SPI caches parameter types on first prepare.
23702444
if (!is_tombstone) {
2371-
col_value = cloudsync_decode_bytea_to_pgvalue(insert_value_encoded, NULL);
2445+
bool value_is_null = false;
2446+
col_value = cloudsync_decode_bytea_to_pgvalue(insert_value_encoded, &value_is_null);
2447+
2448+
// When value is NULL, create a typed NULL pgvalue with the decoded type.
2449+
// We map the column's actual Oid to the corresponding decoded Oid (e.g., INT4OID → INT8OID).
2450+
if (!col_value && value_is_null) {
2451+
Oid col_oid = get_column_oid(table_schema(table), insert_tbl, insert_name);
2452+
if (OidIsValid(col_oid)) {
2453+
Oid decoded_oid = map_column_oid_to_decoded_oid(col_oid);
2454+
col_value = pgvalue_create((Datum)0, decoded_oid, -1, InvalidOid, true);
2455+
}
2456+
}
23722457
}
23732458

23742459
int rc = DBRES_OK;

0 commit comments

Comments
 (0)