5050#define CLOUDSYNC_MIN_DB_VERSION 0
5151
5252#define CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK 1
53- #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE 512*1024
54- #define CLOUDSYNC_PAYLOAD_SIGNATURE 'CLSY'
53+ #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE ( 512*1024)
54+ #define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
5555#define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
5656#define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
5757#define CLOUDSYNC_PAYLOAD_VERSION_2 2
@@ -978,7 +978,7 @@ bool table_add_to_context (cloudsync_context *data, table_algo algo, const char
978978 }
979979
980980 int ncols = database_count_nonpk (data , table_name );
981- if (count < 0 ) {cloudsync_set_dberror (data ); goto abort_add_table ;}
981+ if (ncols < 0 ) {cloudsync_set_dberror (data ); goto abort_add_table ;}
982982 int rc = table_add_stmts (table , ncols );
983983 if (rc != DBRES_OK ) goto abort_add_table ;
984984
@@ -1508,6 +1508,9 @@ void cloudsync_context_free (void *ctx) {
15081508 DEBUG_SETTINGS ("cloudsync_context_free %p" , data );
15091509 if (!data ) return ;
15101510
1511+ // free all table contexts and prepared statements
1512+ cloudsync_terminate (data );
1513+
15111514 cloudsync_memory_free (data -> tables );
15121515 cloudsync_memory_free (data );
15131516}
@@ -1615,7 +1618,7 @@ int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
16151618 }
16161619
16171620 // drop original triggers
1618- database_delete_triggers (data , table_name );
1621+ rc = database_delete_triggers (data , table_name );
16191622 if (rc != DBRES_OK ) {
16201623 char buffer [1024 ];
16211624 snprintf (buffer , sizeof (buffer ), "Unable to delete triggers for table %s in cloudsync_begin_alter." , table_name );
@@ -2075,7 +2078,7 @@ int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync
20752078 char * buffer = payload -> buffer + payload -> bused ;
20762079 size_t bsize = payload -> balloc - payload -> bused ;
20772080 char * p = pk_encode ((dbvalue_t * * )argv , argc , buffer , false, & bsize , data -> skip_decode_idx );
2078- if (!p ) cloudsync_set_error (data , "An error occurred while encoding payload" , DBRES_ERROR );
2081+ if (!p ) return cloudsync_set_error (data , "An error occurred while encoding payload" , DBRES_ERROR );
20792082
20802083 // update buffer
20812084 payload -> bused += breq ;
@@ -2224,6 +2227,9 @@ static int cloudsync_payload_decode_callback (void *xdata, int index, int type,
22242227// #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
22252228
22262229int cloudsync_payload_apply (cloudsync_context * data , const char * payload , int blen , int * pnrows ) {
2230+ // sanity check
2231+ if (blen < (int )sizeof (cloudsync_payload_header )) return cloudsync_set_error (data , "Error on cloudsync_payload_apply: invalid payload length" , DBRES_MISUSE );
2232+
22272233 // decode header
22282234 cloudsync_payload_header header ;
22292235 memcpy (& header , payload , sizeof (cloudsync_payload_header ));
@@ -2250,30 +2256,30 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
22502256 }
22512257
22522258 const char * buffer = payload + sizeof (cloudsync_payload_header );
2253- blen -= sizeof (cloudsync_payload_header );
2254-
2259+ size_t buf_len = ( size_t ) blen - sizeof (cloudsync_payload_header );
2260+
22552261 // sanity check checksum (only if version is >= 2)
22562262 if (header .version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM ) {
2257- uint64_t checksum = pk_checksum (buffer , blen );
2263+ uint64_t checksum = pk_checksum (buffer , buf_len );
22582264 if (cloudsync_payload_checksum_verify (& header , checksum ) == false) {
22592265 return cloudsync_set_error (data , "Error on cloudsync_payload_apply: invalid checksum" , DBRES_MISUSE );
22602266 }
22612267 }
2262-
2268+
22632269 // check if payload is compressed
22642270 char * clone = NULL ;
22652271 if (header .expanded_size != 0 ) {
22662272 clone = (char * )cloudsync_memory_alloc (header .expanded_size );
22672273 if (!clone ) return cloudsync_set_error (data , "Unable to allocate memory to uncompress payload" , DBRES_NOMEM );
2268-
2269- uint32_t rc = LZ4_decompress_safe (buffer , clone , blen , header .expanded_size );
2270- if (rc <= 0 || rc != header .expanded_size ) {
2274+
2275+ int lz4_rc = LZ4_decompress_safe (buffer , clone , ( int ) buf_len , ( int ) header .expanded_size );
2276+ if (lz4_rc <= 0 || ( uint32_t ) lz4_rc != header .expanded_size ) {
22712277 if (clone ) cloudsync_memory_free (clone );
22722278 return cloudsync_set_error (data , "Error on cloudsync_payload_apply: unable to decompress BLOB" , DBRES_MISUSE );
22732279 }
2274-
2280+
22752281 buffer = (const char * )clone ;
2276- blen = header .expanded_size ;
2282+ buf_len = ( size_t ) header .expanded_size ;
22772283 }
22782284
22792285 // precompile the insert statement
@@ -2298,7 +2304,7 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
22982304
22992305 for (uint32_t i = 0 ; i < nrows ; ++ i ) {
23002306 size_t seek = 0 ;
2301- int res = pk_decode ((char * )buffer , blen , ncols , & seek , data -> skip_decode_idx , cloudsync_payload_decode_callback , & decoded_context );
2307+ int res = pk_decode ((char * )buffer , buf_len , ncols , & seek , data -> skip_decode_idx , cloudsync_payload_decode_callback , & decoded_context );
23022308 if (res == -1 ) {
23032309 if (in_savepoint ) database_rollback_savepoint (data , "cloudsync_payload_apply" );
23042310 rc = DBRES_ERROR ;
@@ -2356,7 +2362,7 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
23562362 }
23572363
23582364 buffer += seek ;
2359- blen -= seek ;
2365+ buf_len -= seek ;
23602366 dbvm_reset (vm );
23612367 }
23622368
@@ -2424,7 +2430,7 @@ int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size,
24242430 if (rc != DBRES_OK ) return rc ;
24252431
24262432 // exit if there is no data to send
2427- if (blob == NULL || blob_size == 0 ) return DBRES_OK ;
2433+ if (blob == NULL || * blob_size == 0 ) return DBRES_OK ;
24282434 return rc ;
24292435}
24302436
@@ -2567,7 +2573,7 @@ int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context
25672573 }
25682574
25692575 // drop original triggers
2570- database_delete_triggers (data , table_name );
2576+ rc = database_delete_triggers (data , table_name );
25712577 if (rc != DBRES_OK ) {
25722578 char buffer [1024 ];
25732579 snprintf (buffer , sizeof (buffer ), "Unable to delete triggers for table %s" , table_name );
@@ -2666,6 +2672,13 @@ int cloudsync_init_table (cloudsync_context *data, const char *table_name, const
26662672 snprintf (buffer , sizeof (buffer ), "Unknown CRDT algorithm name %s" , algo_name );
26672673 return cloudsync_set_error (data , buffer , DBRES_ERROR );
26682674 }
2675+
2676+ // DWS and AWS algorithms are not yet implemented in the merge logic
2677+ if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws ) {
2678+ char buffer [1024 ];
2679+ snprintf (buffer , sizeof (buffer ), "CRDT algorithm %s is not yet supported" , algo_name );
2680+ return cloudsync_set_error (data , buffer , DBRES_ERROR );
2681+ }
26692682
26702683 // check if table name was already augmented
26712684 table_algo algo_current = dbutils_table_settings_get_algo (data , table_name );
0 commit comments