Skip to content

Commit 4ed28cf

Browse files
committed
PGPRO-1449: Add MERGE command: merge PAGE with FULL backup
1 parent e7d088c commit 4ed28cf

File tree

18 files changed

+1658
-548
lines changed

18 files changed

+1658
-548
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ OBJS = src/backup.o src/catalog.o src/configure.o src/data.o \
55
src/util.o src/validate.o src/datapagemap.o src/parsexlog.o \
66
src/xlogreader.o src/streamutil.o src/receivelog.o \
77
src/archive.o src/utils/parray.o src/utils/pgut.o src/utils/logger.o \
8-
src/utils/json.o src/utils/thread.o
8+
src/utils/json.o src/utils/thread.o src/merge.o
99

1010
EXTRA_CLEAN = src/datapagemap.c src/datapagemap.h src/xlogreader.c \
1111
src/receivelog.c src/receivelog.h src/streamutil.c src/streamutil.h src/logging.h

src/backup.c

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ static int checkpoint_timeout(void);
104104

105105
//static void backup_list_file(parray *files, const char *root, )
106106
static void parse_backup_filelist_filenames(parray *files, const char *root);
107-
static void write_backup_file_list(parray *files, const char *root);
108107
static void wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment);
109108
static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup);
110109
static void make_pagemap_from_ptrack(parray *files);
@@ -515,8 +514,6 @@ do_backup_instance(void)
515514

516515
/* get list of backups already taken */
517516
backup_list = catalog_get_backup_list(INVALID_BACKUP_ID);
518-
if (backup_list == NULL)
519-
elog(ERROR, "Failed to get backup list.");
520517

521518
prev_backup = catalog_get_last_data_backup(backup_list, current.tli);
522519
if (prev_backup == NULL)
@@ -697,8 +694,11 @@ do_backup_instance(void)
697694
pg_atomic_clear_flag(&file->lock);
698695
}
699696

700-
/* sort by size for load balancing */
697+
/* Sort by size for load balancing */
701698
parray_qsort(backup_files_list, pgFileCompareSize);
699+
/* Sort the array for binary search */
700+
if (prev_backup_filelist)
701+
parray_qsort(prev_backup_filelist, pgFileComparePath);
702702

703703
/* init thread args with own file lists */
704704
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
@@ -788,7 +788,7 @@ do_backup_instance(void)
788788
}
789789

790790
/* Print the list of files to backup catalog */
791-
write_backup_file_list(backup_files_list, pgdata);
791+
pgBackupWriteFileList(&current, backup_files_list, pgdata);
792792

793793
/* Compute summary of size of regular files in the backup */
794794
for (i = 0; i < parray_num(backup_files_list); i++)
@@ -2076,35 +2076,32 @@ backup_files(void *arg)
20762076
/* Check that file exist in previous backup */
20772077
if (current.backup_mode != BACKUP_MODE_FULL)
20782078
{
2079-
int p;
20802079
char *relative;
2081-
int n_prev_files = parray_num(arguments->prev_filelist);
2080+
pgFile key;
2081+
pgFile **prev_file;
20822082

20832083
relative = GetRelativePath(file->path, arguments->from_root);
2084-
for (p = 0; p < n_prev_files; p++)
2085-
{
2086-
pgFile *prev_file;
2087-
2088-
prev_file = (pgFile *) parray_get(arguments->prev_filelist, p);
2084+
key.path = relative;
20892085

2090-
if (strcmp(relative, prev_file->path) == 0)
2091-
{
2092-
/* File exists in previous backup */
2093-
file->exists_in_prev = true;
2094-
// elog(VERBOSE, "File exists at the time of previous backup %s", relative);
2095-
break;
2096-
}
2097-
}
2086+
prev_file = (pgFile **) parray_bsearch(arguments->prev_filelist,
2087+
&key, pgFileComparePath);
2088+
if (prev_file)
2089+
/* File exists in previous backup */
2090+
file->exists_in_prev = true;
20982091
}
20992092
/* copy the file into backup */
21002093
if (file->is_datafile && !file->is_cfs)
21012094
{
2095+
char to_path[MAXPGPATH];
2096+
2097+
join_path_components(to_path, arguments->to_root,
2098+
file->path + strlen(arguments->from_root) + 1);
2099+
21022100
/* backup block by block if datafile AND not compressed by cfs*/
2103-
if (!backup_data_file(arguments,
2104-
arguments->from_root,
2105-
arguments->to_root, file,
2101+
if (!backup_data_file(arguments, to_path, file,
21062102
arguments->prev_start_lsn,
2107-
current.backup_mode))
2103+
current.backup_mode,
2104+
compress_alg, compress_level))
21082105
{
21092106
file->write_size = BYTES_INVALID;
21102107
elog(VERBOSE, "File \"%s\" was not copied to backup", file->path);
@@ -2279,31 +2276,6 @@ set_cfs_datafiles(parray *files, const char *root, char *relative, size_t i)
22792276
free(cfs_tblspc_path);
22802277
}
22812278

2282-
2283-
/*
2284-
* Output the list of files to backup catalog DATABASE_FILE_LIST
2285-
*/
2286-
static void
2287-
write_backup_file_list(parray *files, const char *root)
2288-
{
2289-
FILE *fp;
2290-
char path[MAXPGPATH];
2291-
2292-
pgBackupGetPath(&current, path, lengthof(path), DATABASE_FILE_LIST);
2293-
2294-
fp = fopen(path, "wt");
2295-
if (fp == NULL)
2296-
elog(ERROR, "cannot open file list \"%s\": %s", path,
2297-
strerror(errno));
2298-
2299-
print_file_list(fp, files, root);
2300-
2301-
if (fflush(fp) != 0 ||
2302-
fsync(fileno(fp)) != 0 ||
2303-
fclose(fp))
2304-
elog(ERROR, "cannot write file list \"%s\": %s", path, strerror(errno));
2305-
}
2306-
23072279
/*
23082280
* Find pgfile by given rnode in the backup_files_list
23092281
* and add given blkno to its pagemap.

src/catalog.c

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,9 @@ catalog_get_backup_list(time_t requested_backup_id)
325325
if (backups)
326326
parray_walk(backups, pgBackupFree);
327327
parray_free(backups);
328+
329+
elog(ERROR, "Failed to get backup list");
330+
328331
return NULL;
329332
}
330333

@@ -389,7 +392,7 @@ pgBackupWriteControl(FILE *out, pgBackup *backup)
389392
deparse_compress_alg(backup->compress_alg));
390393
fprintf(out, "compress-level = %d\n", backup->compress_level);
391394
fprintf(out, "from-replica = %s\n", backup->from_replica ? "true" : "false");
392-
395+
393396
fprintf(out, "\n#Compatibility\n");
394397
fprintf(out, "block-size = %u\n", backup->block_size);
395398
fprintf(out, "xlog-block-size = %u\n", backup->wal_block_size);
@@ -462,6 +465,30 @@ pgBackupWriteBackupControlFile(pgBackup *backup)
462465
fclose(fp);
463466
}
464467

468+
/*
469+
* Output the list of files to backup catalog DATABASE_FILE_LIST
470+
*/
471+
void
472+
pgBackupWriteFileList(pgBackup *backup, parray *files, const char *root)
473+
{
474+
FILE *fp;
475+
char path[MAXPGPATH];
476+
477+
pgBackupGetPath(backup, path, lengthof(path), DATABASE_FILE_LIST);
478+
479+
fp = fopen(path, "wt");
480+
if (fp == NULL)
481+
elog(ERROR, "cannot open file list \"%s\": %s", path,
482+
strerror(errno));
483+
484+
print_file_list(fp, files, root);
485+
486+
if (fflush(fp) != 0 ||
487+
fsync(fileno(fp)) != 0 ||
488+
fclose(fp))
489+
elog(ERROR, "cannot write file list \"%s\": %s", path, strerror(errno));
490+
}
491+
465492
/*
466493
* Read BACKUP_CONTROL_FILE and create pgBackup.
467494
* - Comment starts with ';'.
@@ -515,7 +542,7 @@ readBackupControlFile(const char *path)
515542
return NULL;
516543
}
517544

518-
pgBackup_init(backup);
545+
pgBackupInit(backup);
519546
parsed_options = pgut_readopt(path, options, WARNING);
520547

521548
if (parsed_options == 0)
@@ -566,10 +593,12 @@ readBackupControlFile(const char *path)
566593
{
567594
if (strcmp(status, "OK") == 0)
568595
backup->status = BACKUP_STATUS_OK;
569-
else if (strcmp(status, "RUNNING") == 0)
570-
backup->status = BACKUP_STATUS_RUNNING;
571596
else if (strcmp(status, "ERROR") == 0)
572597
backup->status = BACKUP_STATUS_ERROR;
598+
else if (strcmp(status, "RUNNING") == 0)
599+
backup->status = BACKUP_STATUS_RUNNING;
600+
else if (strcmp(status, "MERGING") == 0)
601+
backup->status = BACKUP_STATUS_MERGING;
573602
else if (strcmp(status, "DELETING") == 0)
574603
backup->status = BACKUP_STATUS_DELETING;
575604
else if (strcmp(status, "DELETED") == 0)
@@ -698,11 +727,63 @@ deparse_compress_alg(int alg)
698727
return NULL;
699728
}
700729

730+
/*
731+
* Fill pgBackup struct with default values.
732+
*/
733+
void
734+
pgBackupInit(pgBackup *backup)
735+
{
736+
backup->backup_id = INVALID_BACKUP_ID;
737+
backup->backup_mode = BACKUP_MODE_INVALID;
738+
backup->status = BACKUP_STATUS_INVALID;
739+
backup->tli = 0;
740+
backup->start_lsn = 0;
741+
backup->stop_lsn = 0;
742+
backup->start_time = (time_t) 0;
743+
backup->end_time = (time_t) 0;
744+
backup->recovery_xid = 0;
745+
backup->recovery_time = (time_t) 0;
746+
747+
backup->data_bytes = BYTES_INVALID;
748+
backup->wal_bytes = BYTES_INVALID;
749+
750+
backup->compress_alg = COMPRESS_ALG_DEFAULT;
751+
backup->compress_level = COMPRESS_LEVEL_DEFAULT;
752+
753+
backup->block_size = BLCKSZ;
754+
backup->wal_block_size = XLOG_BLCKSZ;
755+
backup->checksum_version = 0;
756+
757+
backup->stream = false;
758+
backup->from_replica = false;
759+
backup->parent_backup = INVALID_BACKUP_ID;
760+
backup->primary_conninfo = NULL;
761+
backup->program_version[0] = '\0';
762+
backup->server_version[0] = '\0';
763+
}
764+
765+
/*
766+
* Copy backup metadata from **src** into **dst**.
767+
*/
768+
void
769+
pgBackupCopy(pgBackup *dst, pgBackup *src)
770+
{
771+
pfree(dst->primary_conninfo);
772+
773+
memcpy(dst, src, sizeof(pgBackup));
774+
775+
if (src->primary_conninfo)
776+
dst->primary_conninfo = pstrdup(src->primary_conninfo);
777+
}
778+
701779
/* free pgBackup object */
702780
void
703781
pgBackupFree(void *backup)
704782
{
705-
free(backup);
783+
pgBackup *b = (pgBackup *) backup;
784+
785+
pfree(b->primary_conninfo);
786+
pfree(backup);
706787
}
707788

708789
/* Compare two pgBackup with their IDs (start time) in ascending order */

0 commit comments

Comments
 (0)