Skip to content

[pbckp-128] dry-run option for catchup #477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ env:
- PG_VERSION=9.5 PG_BRANCH=REL9_5_STABLE
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=off MODE=archive
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=REL_13_STABLE MODE=backup
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=REL_13_STABLE MODE=catchup
- PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=REL_13_STABLE MODE=catchup
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=off MODE=compression
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=off MODE=delta
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=off MODE=locking
Expand Down
113 changes: 69 additions & 44 deletions src/catchup.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,16 @@ catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn,
source_id = get_system_identifier(source_pgdata, FIO_DB_HOST, false); /* same as instance_config.system_identifier */

if (source_conn_id != source_id)
elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu",
elog(ERROR, "Database identifiers mismatch: we %s connected to DB id %lu, but in \"%s\" we found id %lu",
dry_run? "can":"will",
source_conn_id, source_pgdata, source_id);

if (current.backup_mode != BACKUP_MODE_FULL)
{
dest_id = get_system_identifier(dest_pgdata, FIO_LOCAL_HOST, false);
if (source_conn_id != dest_id)
elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu",
elog(ERROR, "Database identifiers mismatch: we %s connected to DB id %lu, but in \"%s\" we found id %lu",
dry_run? "can":"will",
source_conn_id, dest_pgdata, dest_id);
}
}
Expand Down Expand Up @@ -706,9 +708,12 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,

/* Start stream replication */
join_path_components(dest_xlog_path, dest_pgdata, PG_XLOG_DIR);
fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli, false);
if (!dry_run)
{
fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli, false);
}

source_filelist = parray_new();

Expand Down Expand Up @@ -820,9 +825,9 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
char dirpath[MAXPGPATH];

join_path_components(dirpath, dest_pgdata, file->rel_path);

elog(VERBOSE, "Create directory '%s'", dirpath);
fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
elog(VERBOSE, "Directory '%s' %s be created", dirpath, dry_run? "can":"will");
if (!dry_run)
fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
}
else
{
Expand Down Expand Up @@ -850,18 +855,21 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,

join_path_components(to_path, dest_pgdata, file->rel_path);

elog(VERBOSE, "Create directory \"%s\" and symbolic link \"%s\"",
linked_path, to_path);
elog(VERBOSE, "Directory \"%s\" and symbolic link \"%s\" %s be created",
linked_path, to_path, dry_run? "can":"will");

/* create tablespace directory */
if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
elog(ERROR, "Could not create tablespace directory \"%s\": %s",
linked_path, strerror(errno));

/* create link to linked_path */
if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
linked_path, to_path, strerror(errno));
if (!dry_run)
{
/* create tablespace directory */
if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
elog(ERROR, "Could not create tablespace directory \"%s\": %s",
linked_path, strerror(errno));

/* create link to linked_path */
if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
linked_path, to_path, strerror(errno));
}
}
}

Expand Down Expand Up @@ -901,7 +909,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
*/
if (current.backup_mode != BACKUP_MODE_FULL)
{
elog(INFO, "Removing redundant files in destination directory");
elog(INFO, "Redundant files %s in destination directory", dry_run ? "can" : "will");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не понимаю новое сообщение

parray_qsort(dest_filelist, pgFileCompareRelPathWithExternalDesc);
for (i = 0; i < parray_num(dest_filelist); i++)
{
Expand Down Expand Up @@ -930,11 +938,15 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
char fullpath[MAXPGPATH];

join_path_components(fullpath, dest_pgdata, file->rel_path);
fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);
elog(VERBOSE, "Deleted file \"%s\"", fullpath);
if (!dry_run)
{
fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);
elog(VERBOSE, "File \"%s\" %s deleted", fullpath, dry_run ? "can" : "will");
}

/* shrink dest pgdata list */
pgFileFree(file);
if (!dry_run)
pgFileFree(file);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pgFileFree -- это же про освобождение памяти, зачем его пропускать?

parray_remove(dest_filelist, i);
i--;
}
Expand All @@ -951,17 +963,20 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
if (dest_filelist)
parray_qsort(dest_filelist, pgFileCompareRelPathWithExternal);

/* run copy threads */
elog(INFO, "Start transferring data files");
time(&start_time);
transfered_datafiles_bytes = catchup_multithreaded_copy(num_threads, &source_node_info,
source_pgdata, dest_pgdata,
source_filelist, dest_filelist,
dest_redo.lsn, current.backup_mode);
catchup_isok = transfered_datafiles_bytes != -1;
if (!dry_run)
{
/* run copy threads */
elog(INFO, "Start transferring data files");
time(&start_time);
transfered_datafiles_bytes = catchup_multithreaded_copy(num_threads, &source_node_info,
source_pgdata, dest_pgdata,
source_filelist, dest_filelist,
dest_redo.lsn, current.backup_mode);
catchup_isok = transfered_datafiles_bytes != -1;
}

/* at last copy control file */
if (catchup_isok)
if (catchup_isok && !dry_run)
{
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
Expand All @@ -972,7 +987,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
transfered_datafiles_bytes += source_pg_control_file->size;
}

if (!catchup_isok)
if (!catchup_isok && !dry_run)
{
char pretty_time[20];
char pretty_transfered_data_bytes[20];
Expand Down Expand Up @@ -1010,15 +1025,19 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
pg_free(stop_backup_query_text);
}

wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, &current);
if (!dry_run)
wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, &current);

#if PG_VERSION_NUM >= 90600
/* Write backup_label */
Assert(stop_backup_result.backup_label_content != NULL);
pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
NULL);
free(stop_backup_result.backup_label_content);
if (!dry_run)
{
pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
NULL);
free(stop_backup_result.backup_label_content);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Получается что тут память утекает?

}
stop_backup_result.backup_label_content = NULL;
stop_backup_result.backup_label_content_len = 0;

Expand All @@ -1040,6 +1059,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
#endif

/* wait for end of wal streaming and calculate wal size transfered */
if (!dry_run)
{
parray *wal_files_list = NULL;
wal_files_list = parray_new();
Expand Down Expand Up @@ -1081,7 +1101,8 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
pretty_size(transfered_datafiles_bytes, pretty_transfered_data_bytes, lengthof(pretty_transfered_data_bytes));
pretty_size(transfered_walfiles_bytes, pretty_transfered_wal_bytes, lengthof(pretty_transfered_wal_bytes));

elog(INFO, "Databases synchronized. Transfered datafiles size: %s, transfered wal size: %s, time elapsed: %s",
elog(INFO, "Databases %s synchronized. Transfered datafiles size: %s, transfered wal size: %s, time elapsed: %s",
dry_run ? "can be" : "was",
pretty_transfered_data_bytes, pretty_transfered_wal_bytes, pretty_time);

if (current.backup_mode != BACKUP_MODE_FULL)
Expand All @@ -1091,13 +1112,17 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
}

/* Sync all copied files unless '--no-sync' flag is used */
if (sync_dest_files)
catchup_sync_destination_files(dest_pgdata, FIO_LOCAL_HOST, source_filelist, source_pg_control_file);
else
elog(WARNING, "Files are not synced to disk");
if (!dry_run)
{
/* Sync all copied files unless '--no-sync' flag is used */
if (sync_dest_files)
catchup_sync_destination_files(dest_pgdata, FIO_LOCAL_HOST, source_filelist, source_pg_control_file);
else
elog(WARNING, "Files are not synced to disk");
}

/* Cleanup */
if (dest_filelist)
if (dest_filelist && !dry_run)
{
parray_walk(dest_filelist, pgFileFree);
parray_free(dest_filelist);
Expand Down
3 changes: 3 additions & 0 deletions src/help.c
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ help_catchup(void)
printf(_(" [--remote-proto] [--remote-host]\n"));
printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
printf(_(" [--ssh-options]\n"));
printf(_(" [--dry-run]\n"));
printf(_(" [--help]\n\n"));

printf(_(" -b, --backup-mode=catchup-mode catchup mode=FULL|DELTA|PTRACK\n"));
Expand Down Expand Up @@ -1081,4 +1082,6 @@ help_catchup(void)
printf(_(" --remote-user=username user name for ssh connection (default: current user)\n"));
printf(_(" --ssh-options=ssh_options additional ssh options (default: none)\n"));
printf(_(" (example: --ssh-options='-c cipher_spec -F configfile')\n\n"));

printf(_(" --dry-run perform a trial run without any changes\n\n"));
}
5 changes: 3 additions & 2 deletions src/ptrack.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ make_pagemap_from_ptrack_2(parray *files,
page_map_entry *dummy_map = NULL;

/* Receive all available ptrack bitmaps at once */
filemaps = pg_ptrack_get_pagemapset(backup_conn, ptrack_schema,
ptrack_version_num, lsn);
if (!dry_run)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Так я думаю, что как раз это-то можно и не пропускать, потому что этот вызов ничего не меняет в данных

filemaps = pg_ptrack_get_pagemapset(backup_conn, ptrack_schema,
ptrack_version_num, lsn);

if (filemaps != NULL)
parray_qsort(filemaps, pgFileMapComparePath);
Expand Down
98 changes: 98 additions & 0 deletions tests/catchup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1455,3 +1455,101 @@ def test_config_exclusion(self):
dst_pg.stop()
#self.assertEqual(1, 0, 'Stop test')
self.del_test_dir(module_name, self.fname)

def test_dry_run_catchup_full(self):
"""
Test dry-run option for full catchup
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
src_pg.safe_psql(
"postgres",
"CREATE TABLE ultimate_question(answer int)")

# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))

# save the condition before dry-run
dst_before = dst_pg.data_dir

# do full catchup
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', '--dry-run']
)

# compare data dirs before and after cathup
self.compare_pgdata(
self.pgdata_content(dst_before),
self.pgdata_content(dst_pg.data_dir)
)

# compare data dirs before and after cathup
# self.compare_pgdata(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А зачем этот закомментаренный кусок кода нужен?

# self.pgdata_content(dst_before),
# self.pgdata_content(dst_pg.data_dir)
# )

# Cleanup
src_pg.stop()

def test_dry_run_catchup_ptrack(self):
"""
Test dry-run option for catchup in incremental mode
"""
if not self.ptrack:
return unittest.skip('Skipped because ptrack support is disabled')

# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
src_pg.safe_psql(
"postgres",
"CREATE TABLE ultimate_question(answer int)")

# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.set_replica(src_pg, dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
dst_pg.stop()

# save the condition before dry-run
dst_before = dst_pg.data_dir

# do incremental catchup
self.catchup_node(
backup_mode = 'PTRACK',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', '--dry-run']
)

# compare data dirs before and after cathup
self.compare_pgdata(
self.pgdata_content(dst_before),
self.pgdata_content(dst_pg.data_dir)
)

# Cleanup
src_pg.stop()

1 change: 1 addition & 0 deletions tests/expected/option_help.out
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database.
[--remote-proto] [--remote-host]
[--remote-port] [--remote-path] [--remote-user]
[--ssh-options]
[--dry-run]
[--help]

Read the website for details. <https://github.com/postgrespro/pg_probackup>
Expand Down