Skip to content

Commit 073f4f3

Browse files
committed
Added the ability to perform a perform a sync only if a column expression is satisfied
1 parent 58ce9a4 commit 073f4f3

File tree

16 files changed

+754
-40
lines changed

16 files changed

+754
-40
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,5 @@ jniLibs/
4848

4949
# System
5050
.DS_Store
51-
Thumbs.db
51+
Thumbs.db
52+
CLAUDE.md

src/cloudsync.c

Lines changed: 107 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1794,13 +1794,104 @@ int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
17941794
return rc;
17951795
}
17961796

1797+
// MARK: - Filter Rewrite -
1798+
1799+
// Replace bare column names in a filter expression with prefix-qualified names.
1800+
// E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
1801+
// Columns must be sorted by length descending by the caller to avoid partial matches.
1802+
// Skips content inside single-quoted string literals.
1803+
// Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
1804+
// Helper: check if an identifier token matches a column name.
1805+
static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
1806+
for (int i = 0; i < ncols; ++i) {
1807+
if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
1808+
return true;
1809+
}
1810+
return false;
1811+
}
1812+
1813+
// Helper: check if character is part of a SQL identifier.
1814+
static bool filter_is_ident_char (char c) {
1815+
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
1816+
(c >= '0' && c <= '9') || c == '_';
1817+
}
1818+
1819+
char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
1820+
if (!filter || !prefix || !columns || ncols <= 0) return NULL;
1821+
1822+
size_t filter_len = strlen(filter);
1823+
size_t prefix_len = strlen(prefix);
1824+
1825+
// Each identifier match grows by at most (prefix_len + 3) bytes.
1826+
// Worst case: the entire filter is one repeated column reference separated by
1827+
// single characters, so up to (filter_len / 2) matches. Use a safe upper bound.
1828+
size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
1829+
size_t cap = filter_len + max_growth + 64;
1830+
char *result = (char *)cloudsync_memory_alloc(cap);
1831+
if (!result) return NULL;
1832+
size_t out = 0;
1833+
1834+
// Single pass: tokenize into identifiers, quoted strings, and everything else.
1835+
size_t i = 0;
1836+
while (i < filter_len) {
1837+
// Skip single-quoted string literals verbatim (handle '' escape)
1838+
if (filter[i] == '\'') {
1839+
result[out++] = filter[i++];
1840+
while (i < filter_len) {
1841+
if (filter[i] == '\'') {
1842+
result[out++] = filter[i++];
1843+
// '' is an escaped quote — keep going
1844+
if (i < filter_len && filter[i] == '\'') {
1845+
result[out++] = filter[i++];
1846+
continue;
1847+
}
1848+
break; // single ' ends the literal
1849+
}
1850+
result[out++] = filter[i++];
1851+
}
1852+
continue;
1853+
}
1854+
1855+
// Extract identifier token
1856+
if (filter_is_ident_char(filter[i])) {
1857+
size_t start = i;
1858+
while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
1859+
size_t token_len = i - start;
1860+
1861+
if (filter_is_column(&filter[start], token_len, columns, ncols)) {
1862+
// Emit PREFIX."column_name"
1863+
memcpy(&result[out], prefix, prefix_len); out += prefix_len;
1864+
result[out++] = '.';
1865+
result[out++] = '"';
1866+
memcpy(&result[out], &filter[start], token_len); out += token_len;
1867+
result[out++] = '"';
1868+
} else {
1869+
// Not a column — copy as-is
1870+
memcpy(&result[out], &filter[start], token_len); out += token_len;
1871+
}
1872+
continue;
1873+
}
1874+
1875+
// Any other character — copy as-is
1876+
result[out++] = filter[i++];
1877+
}
1878+
1879+
result[out] = '\0';
1880+
return result;
1881+
}
1882+
17971883
int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
17981884
cloudsync_table_context *table = table_lookup(data, table_name);
17991885
if (!table) return DBRES_ERROR;
1800-
1886+
18011887
dbvm_t *vm = NULL;
18021888
int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
18031889

1890+
// Read row-level filter from settings (if any)
1891+
char filter_buf[2048];
1892+
int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
1893+
const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
1894+
18041895
const char *schema = table->schema ? table->schema : "";
18051896
char *sql = sql_build_pk_collist_query(schema, table_name);
18061897
char *pkclause_identifiers = NULL;
@@ -1810,18 +1901,22 @@ int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name)
18101901
char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
18111902

18121903
// Use database-specific query builder to handle type differences in composite PKs
1813-
sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref);
1904+
sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
18141905
if (!sql) {rc = DBRES_NOMEM; goto finalize;}
18151906
rc = database_exec(data, sql);
18161907
cloudsync_memory_free(sql);
18171908
if (rc != DBRES_OK) goto finalize;
1818-
1909+
18191910
// fill missing colums
18201911
// for each non-pk column:
18211912
// The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
1822-
// The old plan does many decodes per candidate and can’t use an index to rule out matches quickly—so it burns CPU and I/O.
1823-
1824-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
1913+
// The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
1914+
1915+
if (filter) {
1916+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
1917+
} else {
1918+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
1919+
}
18251920
rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
18261921
cloudsync_memory_free(sql);
18271922
if (rc != DBRES_OK) goto finalize;
@@ -2723,8 +2818,13 @@ int cloudsync_init_table (cloudsync_context *data, const char *table_name, const
27232818
// sync algo with table (unused in this version)
27242819
// cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
27252820

2821+
// read row-level filter from settings (if any)
2822+
char init_filter_buf[2048];
2823+
int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
2824+
const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
2825+
27262826
// check triggers
2727-
rc = database_create_triggers(data, table_name, algo_new);
2827+
rc = database_create_triggers(data, table_name, algo_new, init_filter);
27282828
if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
27292829

27302830
// check meta-table

src/cloudsync.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
extern "C" {
1818
#endif
1919

20-
#define CLOUDSYNC_VERSION "0.9.101"
20+
#define CLOUDSYNC_VERSION "0.9.110"
2121
#define CLOUDSYNC_MAX_TABLENAME_LEN 512
2222

2323
#define CLOUDSYNC_VALUE_NOTSET -1
@@ -121,6 +121,9 @@ int local_update_move_meta (cloudsync_table_context *table, const char *pk, size
121121
int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid);
122122
int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid);
123123

124+
// filter rewrite
125+
char *cloudsync_filter_add_row_prefix(const char *filter, const char *prefix, char **columns, int ncols);
126+
124127
// decode bind context
125128
char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len);
126129
void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len);

src/database.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ bool database_table_exists (cloudsync_context *data, const char *table_name, con
7070
bool database_internal_table_exists (cloudsync_context *data, const char *name);
7171
bool database_trigger_exists (cloudsync_context *data, const char *table_name);
7272
int database_create_metatable (cloudsync_context *data, const char *table_name);
73-
int database_create_triggers (cloudsync_context *data, const char *table_name, table_algo algo);
73+
int database_create_triggers (cloudsync_context *data, const char *table_name, table_algo algo, const char *filter);
7474
int database_delete_triggers (cloudsync_context *data, const char *table_name);
7575
int database_pk_names (cloudsync_context *data, const char *table_name, char ***names, int *count);
7676
int database_cleanup (cloudsync_context *data);
@@ -148,7 +148,7 @@ char *sql_build_delete_cols_not_in_schema_query(const char *schema, const char *
148148
char *sql_build_pk_collist_query(const char *schema, const char *table_name);
149149
char *sql_build_pk_decode_selectlist_query(const char *schema, const char *table_name);
150150
char *sql_build_pk_qualified_collist_query(const char *schema, const char *table_name);
151-
char *sql_build_insert_missing_pks_query(const char *schema, const char *table_name, const char *pkvalues_identifiers, const char *base_ref, const char *meta_ref);
151+
char *sql_build_insert_missing_pks_query(const char *schema, const char *table_name, const char *pkvalues_identifiers, const char *base_ref, const char *meta_ref, const char *filter);
152152

153153
char *database_table_schema(const char *table_name);
154154
char *database_build_meta_ref(const char *schema, const char *table_name);

src/dbutils.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,10 @@ int dbutils_settings_table_load_callback (void *xdata, int ncols, char **values,
363363
if (strcmp(key, "algo")!=0) continue;
364364

365365
table_algo algo = cloudsync_algo_from_name(value);
366-
if (database_create_triggers(data, table_name, algo) != DBRES_OK) return DBRES_MISUSE;
366+
char fbuf[2048];
367+
int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", fbuf, sizeof(fbuf));
368+
const char *filt = (frc == DBRES_OK && fbuf[0]) ? fbuf : NULL;
369+
if (database_create_triggers(data, table_name, algo, filt) != DBRES_OK) return DBRES_MISUSE;
367370
if (table_add_to_context(data, algo, table_name) == false) return DBRES_MISUSE;
368371

369372
DEBUG_SETTINGS("load tbl_name: %s value: %s", key, value);

src/postgresql/cloudsync--1.0.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,18 @@ RETURNS boolean
102102
AS 'MODULE_PATHNAME', 'cloudsync_set_table'
103103
LANGUAGE C VOLATILE;
104104

105+
-- Set row-level filter for conditional sync
106+
CREATE OR REPLACE FUNCTION cloudsync_set_filter(table_name text, filter_expr text)
107+
RETURNS boolean
108+
AS 'MODULE_PATHNAME', 'cloudsync_set_filter'
109+
LANGUAGE C VOLATILE;
110+
111+
-- Clear row-level filter
112+
CREATE OR REPLACE FUNCTION cloudsync_clear_filter(table_name text)
113+
RETURNS boolean
114+
AS 'MODULE_PATHNAME', 'cloudsync_clear_filter'
115+
LANGUAGE C VOLATILE;
116+
105117
-- Set column-level configuration
106118
CREATE OR REPLACE FUNCTION cloudsync_set_column(table_name text, column_name text, key text, value text)
107119
RETURNS boolean

src/postgresql/cloudsync_postgresql.c

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,125 @@ Datum cloudsync_set_column (PG_FUNCTION_ARGS) {
610610
PG_RETURN_BOOL(true);
611611
}
612612

613+
// MARK: - Row Filter -
614+
615+
// cloudsync_set_filter - Set a row-level filter for conditional sync
616+
PG_FUNCTION_INFO_V1(cloudsync_set_filter);
617+
Datum cloudsync_set_filter (PG_FUNCTION_ARGS) {
618+
if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
619+
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
620+
errmsg("cloudsync_set_filter: table and filter expression required")));
621+
}
622+
623+
const char *tbl = text_to_cstring(PG_GETARG_TEXT_PP(0));
624+
const char *filter_expr = text_to_cstring(PG_GETARG_TEXT_PP(1));
625+
626+
cloudsync_context *data = get_cloudsync_context();
627+
bool spi_connected = false;
628+
629+
int spi_rc = SPI_connect();
630+
if (spi_rc != SPI_OK_CONNECT) {
631+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
632+
}
633+
spi_connected = true;
634+
635+
PG_TRY();
636+
{
637+
// Store filter in table settings
638+
dbutils_table_settings_set_key_value(data, tbl, "*", "filter", filter_expr);
639+
640+
// Read current algo
641+
table_algo algo = dbutils_table_settings_get_algo(data, tbl);
642+
if (algo == table_algo_none) algo = table_algo_crdt_cls;
643+
644+
// Drop triggers
645+
database_delete_triggers(data, tbl);
646+
647+
// Reconnect SPI so that the catalog changes from DROP are visible
648+
SPI_finish();
649+
spi_connected = false;
650+
spi_rc = SPI_connect();
651+
if (spi_rc != SPI_OK_CONNECT) {
652+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
653+
}
654+
spi_connected = true;
655+
656+
// Recreate triggers with filter
657+
int rc = database_create_triggers(data, tbl, algo, filter_expr);
658+
if (rc != DBRES_OK) {
659+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
660+
errmsg("cloudsync_set_filter: error recreating triggers")));
661+
}
662+
}
663+
PG_CATCH();
664+
{
665+
if (spi_connected) SPI_finish();
666+
PG_RE_THROW();
667+
}
668+
PG_END_TRY();
669+
670+
if (spi_connected) SPI_finish();
671+
PG_RETURN_BOOL(true);
672+
}
673+
674+
// cloudsync_clear_filter - Remove the row-level filter for a table
675+
PG_FUNCTION_INFO_V1(cloudsync_clear_filter);
676+
Datum cloudsync_clear_filter (PG_FUNCTION_ARGS) {
677+
if (PG_ARGISNULL(0)) {
678+
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
679+
errmsg("cloudsync_clear_filter: table name required")));
680+
}
681+
682+
const char *tbl = text_to_cstring(PG_GETARG_TEXT_PP(0));
683+
684+
cloudsync_context *data = get_cloudsync_context();
685+
bool spi_connected = false;
686+
687+
int spi_rc = SPI_connect();
688+
if (spi_rc != SPI_OK_CONNECT) {
689+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
690+
}
691+
spi_connected = true;
692+
693+
PG_TRY();
694+
{
695+
// Remove filter from settings
696+
dbutils_table_settings_set_key_value(data, tbl, "*", "filter", NULL);
697+
698+
// Read current algo
699+
table_algo algo = dbutils_table_settings_get_algo(data, tbl);
700+
if (algo == table_algo_none) algo = table_algo_crdt_cls;
701+
702+
// Drop triggers
703+
database_delete_triggers(data, tbl);
704+
705+
// Reconnect SPI so that the catalog changes from DROP are visible
706+
SPI_finish();
707+
spi_connected = false;
708+
spi_rc = SPI_connect();
709+
if (spi_rc != SPI_OK_CONNECT) {
710+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
711+
}
712+
spi_connected = true;
713+
714+
// Recreate triggers without filter
715+
int rc = database_create_triggers(data, tbl, algo, NULL);
716+
if (rc != DBRES_OK) {
717+
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
718+
errmsg("cloudsync_clear_filter: error recreating triggers")));
719+
}
720+
}
721+
PG_CATCH();
722+
{
723+
if (spi_connected) SPI_finish();
724+
PG_RE_THROW();
725+
}
726+
PG_END_TRY();
727+
728+
if (spi_connected) SPI_finish();
729+
PG_RETURN_BOOL(true);
730+
}
731+
613732
// MARK: - Schema Alteration -
614733

615734
// cloudsync_begin_alter - Begin schema alteration

0 commit comments

Comments
 (0)