Skip to content

Commit ac5fc52

Browse files
committed
Merge branch 'wip-pg-extension' of https://github.com/sqliteai/sqlite-sync into wip-pg-extension
2 parents 41c7981 + 1c6a193 commit ac5fc52

File tree

4 files changed

+778
-25
lines changed

4 files changed

+778
-25
lines changed

src/postgresql/cloudsync_postgresql.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2176,6 +2176,10 @@ Datum cloudsync_changes_select(PG_FUNCTION_ARGS) {
21762176
SPI_finish();
21772177
st->spi_connected = false;
21782178

2179+
// SPI operations may leave us in multi_call_memory_ctx
2180+
// Must switch to a safe context before SRF_RETURN_DONE deletes it
2181+
MemoryContextSwitchTo(fcinfo->flinfo->fn_mcxt);
2182+
21792183
SRF_RETURN_DONE(funcctx);
21802184
}
21812185

@@ -2199,6 +2203,10 @@ Datum cloudsync_changes_select(PG_FUNCTION_ARGS) {
21992203
}
22002204
PG_CATCH();
22012205
{
2206+
// Switch to function's context (safe, won't be deleted)
2207+
// Avoids assertion if we're currently in multi_call_memory_ctx
2208+
MemoryContextSwitchTo(fcinfo->flinfo->fn_mcxt);
2209+
22022210
if (st_local && st_local->portal) {
22032211
SPI_cursor_close(st_local->portal);
22042212
st_local->portal = NULL;
@@ -2212,7 +2220,7 @@ Datum cloudsync_changes_select(PG_FUNCTION_ARGS) {
22122220
SPI_finish();
22132221
spi_connected_local = false;
22142222
}
2215-
2223+
22162224
PG_RE_THROW();
22172225
}
22182226
PG_END_TRY();

src/postgresql/database_postgresql.c

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -508,19 +508,21 @@ static bool database_system_exists (cloudsync_context *data, const char *name, c
508508
return false;
509509
}
510510

511+
MemoryContext oldcontext = CurrentMemoryContext;
511512
PG_TRY();
512513
{
513514
Oid argtypes[1] = {TEXTOID};
514515
Datum values[1] = {CStringGetTextDatum(name)};
515516
char nulls[1] = { ' ' };
516-
517+
517518
int rc = SPI_execute_with_args(query, 1, argtypes, values, nulls, true, 0);
518519
exists = (rc >= 0 && SPI_processed > 0);
519520
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
520521
pfree(DatumGetPointer(values[0]));
521522
}
522523
PG_CATCH();
523524
{
525+
MemoryContextSwitchTo(oldcontext);
524526
ErrorData *edata = CopyErrorData();
525527
cloudsync_set_error(data, edata->message, DBRES_ERROR);
526528
FreeErrorData(edata);
@@ -538,9 +540,10 @@ static bool database_system_exists (cloudsync_context *data, const char *name, c
538540
int database_exec (cloudsync_context *data, const char *sql) {
539541
if (!sql) return cloudsync_set_error(data, "SQL statement is NULL", DBRES_ERROR);
540542
cloudsync_reset_error(data);
541-
543+
542544
int rc;
543545
bool is_error = false;
546+
MemoryContext oldcontext = CurrentMemoryContext;
544547
PG_TRY();
545548
{
546549
rc = SPI_execute(sql, false, 0);
@@ -550,6 +553,7 @@ int database_exec (cloudsync_context *data, const char *sql) {
550553
}
551554
PG_CATCH();
552555
{
556+
MemoryContextSwitchTo(oldcontext);
553557
ErrorData *edata = CopyErrorData();
554558
rc = cloudsync_set_error(data, edata->message, DBRES_ERROR);
555559
FreeErrorData(edata);
@@ -578,12 +582,14 @@ int database_exec_callback (cloudsync_context *data, const char *sql, int (*call
578582

579583
int rc;
580584
bool is_error = false;
585+
MemoryContext oldcontext = CurrentMemoryContext;
581586
PG_TRY();
582587
{
583588
rc = SPI_execute(sql, true, 0);
584589
}
585590
PG_CATCH();
586591
{
592+
MemoryContextSwitchTo(oldcontext);
587593
ErrorData *edata = CopyErrorData();
588594
rc = cloudsync_set_error(data, edata->message, DBRES_ERROR);
589595
FreeErrorData(edata);
@@ -1488,8 +1494,9 @@ int databasevm_prepare (cloudsync_context *data, const char *sql, dbvm_t **vm, i
14881494
pg_stmt_t *stmt = (pg_stmt_t *)cloudsync_memory_zeroalloc(sizeof(pg_stmt_t));
14891495
if (!stmt) return cloudsync_set_error(data, "Not enough memory to allocate a dbvm_t struct", DBRES_NOMEM);
14901496
stmt->data = data;
1491-
1497+
14921498
int rc = DBRES_OK;
1499+
MemoryContext oldcontext = CurrentMemoryContext;
14931500
PG_TRY();
14941501
{
14951502
MemoryContext parent = (flags & DBFLAG_PERSISTENT) ? TopMemoryContext : CurrentMemoryContext;
@@ -1500,13 +1507,14 @@ int databasevm_prepare (cloudsync_context *data, const char *sql, dbvm_t **vm, i
15001507
}
15011508
stmt->bind_mcxt = AllocSetContextCreate(stmt->stmt_mcxt, "cloudsync binds", ALLOCSET_DEFAULT_SIZES);
15021509
stmt->row_mcxt = AllocSetContextCreate(stmt->stmt_mcxt, "cloudsync row", ALLOCSET_DEFAULT_SIZES);
1503-
1510+
15041511
MemoryContext old = MemoryContextSwitchTo(stmt->stmt_mcxt);
15051512
stmt->sql = pstrdup(sql);
15061513
MemoryContextSwitchTo(old);
15071514
}
15081515
PG_CATCH();
15091516
{
1517+
MemoryContextSwitchTo(oldcontext);
15101518
ErrorData *edata = CopyErrorData();
15111519
rc = cloudsync_set_error(data, edata->message, DBRES_ERROR);
15121520
FreeErrorData(edata);
@@ -1526,33 +1534,52 @@ int databasevm_prepare (cloudsync_context *data, const char *sql, dbvm_t **vm, i
15261534

15271535
int databasevm_step0 (pg_stmt_t *stmt) {
15281536
cloudsync_context *data = stmt->data;
1537+
if (!data) return DBRES_ERROR;
1538+
15291539
int rc = DBRES_OK;
1530-
1531-
// prepare plan
1540+
MemoryContext oldcontext = CurrentMemoryContext;
1541+
15321542
PG_TRY();
15331543
{
15341544
if (!stmt || !stmt->sql) {
1535-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("databasevm_step0 invalid stmt or sql pointer")));
1545+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1546+
errmsg("databasevm_step0 invalid stmt or sql pointer")));
15361547
}
1537-
1548+
15381549
stmt->plan = SPI_prepare(stmt->sql, stmt->nparams, stmt->types);
15391550
if (stmt->plan == NULL) {
1540-
rc = cloudsync_set_error(data, "Unable to prepare SQL statement", DBRES_ERROR);
1541-
} else {
1542-
SPI_keepplan(stmt->plan);
1543-
stmt->plan_is_prepared = true;
1551+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
1552+
errmsg("Unable to prepare SQL statement")));
15441553
}
1554+
1555+
SPI_keepplan(stmt->plan);
1556+
stmt->plan_is_prepared = true;
15451557
}
15461558
PG_CATCH();
15471559
{
1560+
// Switch to safe context for CopyErrorData (can't be ErrorContext)
1561+
MemoryContextSwitchTo(oldcontext);
15481562
ErrorData *edata = CopyErrorData();
1549-
int err = cloudsync_set_error(data, edata->message, DBRES_ERROR);
1563+
rc = cloudsync_set_error(data, edata->message, DBRES_ERROR);
15501564
FreeErrorData(edata);
15511565
FlushErrorState();
1552-
rc = err;
1566+
1567+
// Clean up partially prepared plan if needed
1568+
if (stmt->plan != NULL && !stmt->plan_is_prepared) {
1569+
PG_TRY();
1570+
{
1571+
SPI_freeplan(stmt->plan);
1572+
}
1573+
PG_CATCH();
1574+
{
1575+
FlushErrorState(); // Swallow errors during cleanup
1576+
}
1577+
PG_END_TRY();
1578+
stmt->plan = NULL;
1579+
}
15531580
}
15541581
PG_END_TRY();
1555-
1582+
15561583
return rc;
15571584
}
15581585

@@ -1568,8 +1595,9 @@ int databasevm_step (dbvm_t *vm) {
15681595
if (rc != DBRES_OK) return rc;
15691596
}
15701597
if (!stmt->plan_is_prepared || !stmt->plan) return DBRES_ERROR;
1571-
1598+
15721599
int rc = DBRES_DONE;
1600+
MemoryContext oldcontext = CurrentMemoryContext;
15731601
PG_TRY();
15741602
{
15751603
do {
@@ -1671,6 +1699,7 @@ int databasevm_step (dbvm_t *vm) {
16711699
}
16721700
PG_CATCH();
16731701
{
1702+
MemoryContextSwitchTo(oldcontext);
16741703
ErrorData *edata = CopyErrorData();
16751704
int err = cloudsync_set_error(data, edata->message, DBRES_ERROR);
16761705
FreeErrorData(edata);
@@ -1719,12 +1748,18 @@ void databasevm_finalize (dbvm_t *vm) {
17191748
void databasevm_reset (dbvm_t *vm) {
17201749
if (!vm) return;
17211750
pg_stmt_t *stmt = (pg_stmt_t*)vm;
1751+
1752+
// Close any open cursor and clear fetched data
17221753
clear_fetch_batch(stmt);
17231754
close_portal(stmt);
1755+
1756+
// Clear global SPI tuple table if any
17241757
if (SPI_tuptable) {
17251758
SPI_freetuptable(SPI_tuptable);
17261759
SPI_tuptable = NULL;
17271760
}
1761+
1762+
// Reset execution state
17281763
stmt->executed_nonselect = false;
17291764

17301765
// Reset parameter values but keep the plan, types, and nparams intact.
@@ -1740,7 +1775,7 @@ void databasevm_reset (dbvm_t *vm) {
17401775
void databasevm_clear_bindings (dbvm_t *vm) {
17411776
if (!vm) return;
17421777
pg_stmt_t *stmt = (pg_stmt_t*)vm;
1743-
1778+
17441779
clear_fetch_batch(stmt);
17451780
close_portal(stmt);
17461781
if (SPI_tuptable) {
@@ -1754,11 +1789,15 @@ void databasevm_clear_bindings (dbvm_t *vm) {
17541789
stmt->plan_is_prepared = false;
17551790
}
17561791

1792+
// DO NOT call clear_fetch_batch() - not related to bindings
1793+
// DO NOT call close_portal() - not related to bindings
1794+
// DO NOT free the plan - clearing bindings != destroying prepared statement
1795+
1796+
// Only clear the bound parameter values
17571797
if (stmt->bind_mcxt) MemoryContextReset(stmt->bind_mcxt);
17581798
stmt->nparams = 0;
1759-
stmt->executed_nonselect = false;
1760-
1761-
// initialize static array of params
1799+
1800+
// Reset params array to defaults
17621801
for (int i = 0; i < MAX_PARAMS; i++) {
17631802
stmt->types[i] = UNKNOWNOID;
17641803
stmt->values[i] = (Datum) 0;
@@ -2288,22 +2327,24 @@ static int database_refresh_snapshot (void) {
22882327
if (!IsTransactionState()) {
22892328
return DBRES_OK; // Not in transaction, nothing to do
22902329
}
2291-
2330+
2331+
MemoryContext oldcontext = CurrentMemoryContext;
22922332
PG_TRY();
22932333
{
22942334
CommandCounterIncrement();
2295-
2335+
22962336
// Pop existing snapshot if any
22972337
if (ActiveSnapshotSet()) {
22982338
PopActiveSnapshot();
22992339
}
2300-
2340+
23012341
// Push fresh snapshot
23022342
PushActiveSnapshot(GetTransactionSnapshot());
23032343
}
23042344
PG_CATCH();
23052345
{
23062346
// Snapshot refresh failed - log warning but don't fail operation
2347+
MemoryContextSwitchTo(oldcontext);
23072348
ErrorData *edata = CopyErrorData();
23082349
elog(WARNING, "refresh_snapshot_after_command failed: %s", edata->message);
23092350
FreeErrorData(edata);
@@ -2319,12 +2360,14 @@ int database_begin_savepoint (cloudsync_context *data, const char *savepoint_nam
23192360
cloudsync_reset_error(data);
23202361
int rc = DBRES_OK;
23212362

2363+
MemoryContext oldcontext = CurrentMemoryContext;
23222364
PG_TRY();
23232365
{
23242366
BeginInternalSubTransaction(NULL);
23252367
}
23262368
PG_CATCH();
23272369
{
2370+
MemoryContextSwitchTo(oldcontext);
23282371
ErrorData *edata = CopyErrorData();
23292372
rc = cloudsync_set_error(data, edata->message, DBRES_ERROR);
23302373
FreeErrorData(edata);
@@ -2339,13 +2382,15 @@ int database_commit_savepoint (cloudsync_context *data, const char *savepoint_na
23392382
cloudsync_reset_error(data);
23402383
int rc = DBRES_OK;
23412384

2385+
MemoryContext oldcontext = CurrentMemoryContext;
23422386
PG_TRY();
23432387
{
23442388
ReleaseCurrentSubTransaction();
23452389
database_refresh_snapshot();
23462390
}
23472391
PG_CATCH();
23482392
{
2393+
MemoryContextSwitchTo(oldcontext);
23492394
ErrorData *edata = CopyErrorData();
23502395
cloudsync_set_error(data, edata->message, DBRES_ERROR);
23512396
FreeErrorData(edata);
@@ -2360,14 +2405,16 @@ int database_commit_savepoint (cloudsync_context *data, const char *savepoint_na
23602405
int database_rollback_savepoint (cloudsync_context *data, const char *savepoint_name) {
23612406
cloudsync_reset_error(data);
23622407
int rc = DBRES_OK;
2363-
2408+
2409+
MemoryContext oldcontext = CurrentMemoryContext;
23642410
PG_TRY();
23652411
{
23662412
RollbackAndReleaseCurrentSubTransaction();
23672413
database_refresh_snapshot();
23682414
}
23692415
PG_CATCH();
23702416
{
2417+
MemoryContextSwitchTo(oldcontext);
23712418
ErrorData *edata = CopyErrorData();
23722419
cloudsync_set_error(data, edata->message, DBRES_ERROR);
23732420
FreeErrorData(edata);

0 commit comments

Comments
 (0)