Skip to content

[PBCKP-146] truncate cfm files (but calc CRC for whole) #549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/archive.c
Original file line number Diff line number Diff line change
Expand Up @@ -1375,11 +1375,11 @@ get_wal_file(const char *filename, const char *from_fullpath,
#ifdef HAVE_LIBZ
/* If requested file is regular WAL segment, then try to open it with '.gz' suffix... */
if (IsXLogFileName(filename))
rc = fio_send_file_gz(from_fullpath_gz, to_fullpath, out, &errmsg);
rc = fio_send_file_gz(from_fullpath_gz, out, &errmsg);
if (rc == FILE_MISSING)
#endif
/* ... failing that, use uncompressed */
rc = fio_send_file(from_fullpath, to_fullpath, out, NULL, &errmsg);
rc = fio_send_file(from_fullpath, out, false, NULL, &errmsg);

/* When not in prefetch mode, try to use partial file */
if (rc == FILE_MISSING && !prefetch_mode && IsXLogFileName(filename))
Expand All @@ -1389,13 +1389,13 @@ get_wal_file(const char *filename, const char *from_fullpath,
#ifdef HAVE_LIBZ
/* '.gz.partial' goes first ... */
snprintf(from_partial, sizeof(from_partial), "%s.gz.partial", from_fullpath);
rc = fio_send_file_gz(from_partial, to_fullpath, out, &errmsg);
rc = fio_send_file_gz(from_partial, out, &errmsg);
if (rc == FILE_MISSING)
#endif
{
/* ... failing that, use '.partial' */
snprintf(from_partial, sizeof(from_partial), "%s.partial", from_fullpath);
rc = fio_send_file(from_partial, to_fullpath, out, NULL, &errmsg);
rc = fio_send_file(from_partial, out, false, NULL, &errmsg);
}

if (rc == SEND_OK)
Expand Down
9 changes: 9 additions & 0 deletions src/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,7 @@ get_backup_filelist(pgBackup *backup, bool strict)
char linked[MAXPGPATH];
char compress_alg_string[MAXPGPATH];
int64 write_size,
full_size,
mode, /* bit length of mode_t depends on platforms */
is_datafile,
is_cfs,
Expand All @@ -1087,6 +1088,8 @@ get_backup_filelist(pgBackup *backup, bool strict)

get_control_value_str(buf, "path", path, sizeof(path),true);
get_control_value_int64(buf, "size", &write_size, true);
if (!get_control_value_int64(buf, "full_size", &full_size, false))
full_size = write_size;
get_control_value_int64(buf, "mode", &mode, true);
get_control_value_int64(buf, "is_datafile", &is_datafile, true);
get_control_value_int64(buf, "is_cfs", &is_cfs, false);
Expand All @@ -1097,6 +1100,7 @@ get_backup_filelist(pgBackup *backup, bool strict)

file = pgFileInit(path);
file->write_size = (int64) write_size;
file->uncompressed_size = full_size;
file->mode = (mode_t) mode;
file->is_datafile = is_datafile ? true : false;
file->is_cfs = is_cfs ? true : false;
Expand Down Expand Up @@ -2561,6 +2565,11 @@ write_backup_filelist(pgBackup *backup, parray *files, const char *root,
file->external_dir_num,
file->dbOid);

if (file->uncompressed_size != 0 &&
file->uncompressed_size != file->write_size)
len += sprintf(line+len, ",\"full_size\":\"" INT64_FORMAT "\"",
file->uncompressed_size);

if (file->is_datafile)
len += sprintf(line+len, ",\"segno\":\"%d\"", file->segno);

Expand Down
117 changes: 28 additions & 89 deletions src/data.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ backup_non_data_file(pgFile *file, pgFile *prev_file,
* and its mtime is less than parent backup start time ... */
if ((pg_strcasecmp(file->name, RELMAPPER_FILENAME) != 0) &&
(prev_file && file->exists_in_prev &&
file->size == prev_file->size &&
file->mtime <= parent_backup_time))
{
/*
Expand Down Expand Up @@ -1387,10 +1388,12 @@ backup_non_data_file_internal(const char *from_fullpath,
const char *to_fullpath, pgFile *file,
bool missing_ok)
{
FILE *in = NULL;
FILE *out = NULL;
ssize_t read_len = 0;
char *buf = NULL;
char *errmsg = NULL;
int rc;
bool cut_zero_tail;

cut_zero_tail = file->forkName == cfm;

INIT_FILE_CRC32(true, file->crc);

Expand All @@ -1412,107 +1415,43 @@ backup_non_data_file_internal(const char *from_fullpath,

/* backup remote file */
if (fio_is_remote(FIO_DB_HOST))
{
char *errmsg = NULL;
int rc = fio_send_file(from_fullpath, to_fullpath, out, file, &errmsg);
rc = fio_send_file(from_fullpath, out, cut_zero_tail, file, &errmsg);
else
rc = fio_send_file_local(from_fullpath, out, cut_zero_tail, file, &errmsg);

/* handle errors */
if (rc == FILE_MISSING)
{
/* maybe deleted, it's not error in case of backup */
if (missing_ok)
{
elog(LOG, "File \"%s\" is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else
elog(ERROR, "File \"%s\" is not found", from_fullpath);
}
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, strerror(errno));
else if (rc != SEND_OK)
/* handle errors */
if (rc == FILE_MISSING)
{
/* maybe deleted, it's not error in case of backup */
if (missing_ok)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot access remote file \"%s\"", from_fullpath);
elog(LOG, "File \"%s\" is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}

pg_free(errmsg);
else
elog(ERROR, "File \"%s\" is not found", from_fullpath);
}
/* backup local file */
else
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, strerror(errno));
else if (rc != SEND_OK)
{
/* open source file for read */
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
{
/* maybe deleted, it's not error in case of backup */
if (errno == ENOENT)
{
if (missing_ok)
{
elog(LOG, "File \"%s\" is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else
elog(ERROR, "File \"%s\" is not found", from_fullpath);
}

elog(ERROR, "Cannot open file \"%s\": %s", from_fullpath,
strerror(errno));
}

/* disable stdio buffering for local input/output files to avoid triple buffering */
setvbuf(in, NULL, _IONBF, BUFSIZ);
setvbuf(out, NULL, _IONBF, BUFSIZ);

/* allocate 64kB buffer */
buf = pgut_malloc(CHUNK_SIZE);

/* copy content and calc CRC */
for (;;)
{
read_len = fread(buf, 1, CHUNK_SIZE, in);

if (ferror(in))
elog(ERROR, "Cannot read from file \"%s\": %s",
from_fullpath, strerror(errno));

if (read_len > 0)
{
if (fwrite(buf, 1, read_len, out) != read_len)
elog(ERROR, "Cannot write to file \"%s\": %s", to_fullpath,
strerror(errno));

/* update CRC */
COMP_FILE_CRC32(true, file->crc, buf, read_len);
file->read_size += read_len;
}

if (feof(in))
break;
}
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot access remote file \"%s\"", from_fullpath);
}

file->write_size = (int64) file->read_size;
pg_free(errmsg); /* ????? */

if (file->write_size > 0)
file->uncompressed_size = file->write_size;
file->uncompressed_size = file->read_size;

cleanup:
/* finish CRC calculation and store into pgFile */
FIN_FILE_CRC32(true, file->crc);

if (in && fclose(in))
elog(ERROR, "Cannot close the file \"%s\": %s", from_fullpath, strerror(errno));

if (out && fclose(out))
elog(ERROR, "Cannot close the file \"%s\": %s", to_fullpath, strerror(errno));

pg_free(buf);
}

/*
Expand Down
63 changes: 62 additions & 1 deletion src/dir.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,67 @@ pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok)
return crc;
}

static const char zerobuf[4096] = {0};

/*
* Read the local file to compute CRC for it extened to real_size.
*/
pg_crc32
pgFileGetCRCForTruncated(const char *file_path, bool use_crc32c, int64_t real_size)
{
FILE *fp;
pg_crc32 crc = 0;
char *buf;
size_t len = 0;
int64_t read_size = 0;

INIT_FILE_CRC32(use_crc32c, crc);

/* open file in binary read mode */
fp = fopen(file_path, PG_BINARY_R);
if (fp == NULL)
{
elog(ERROR, "Cannot open file \"%s\": %s",
file_path, strerror(errno));
}

/* disable stdio buffering */
setvbuf(fp, NULL, _IONBF, BUFSIZ);
buf = pgut_malloc(STDIO_BUFSIZE);

/* calc CRC of file */
for (;;)
{
if (interrupted)
elog(ERROR, "interrupted during CRC calculation");

len = fread(buf, 1, STDIO_BUFSIZE, fp);

if (ferror(fp))
elog(ERROR, "Cannot read \"%s\": %s", file_path, strerror(errno));

/* update CRC */
COMP_FILE_CRC32(use_crc32c, crc, buf, len);

read_size += len;

if (feof(fp))
break;
}

while (read_size < real_size)
{
len = Min(real_size - read_size, sizeof(zerobuf));
COMP_FILE_CRC32(use_crc32c, crc, zerobuf, len);
read_size += len;
}

FIN_FILE_CRC32(use_crc32c, crc);
fclose(fp);
pg_free(buf);

return crc;
}
/*
* Read the local file to compute its CRC.
* We cannot make decision about file decompression because
Expand Down Expand Up @@ -1812,7 +1873,7 @@ write_database_map(pgBackup *backup, parray *database_map, parray *backup_files_
FIO_BACKUP_HOST);
file->crc = pgFileGetCRC(database_map_path, true, false);
file->write_size = file->size;
file->uncompressed_size = file->read_size;
file->uncompressed_size = file->size;

parray_append(backup_files_list, file);
}
Expand Down
2 changes: 1 addition & 1 deletion src/merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ merge_files(void *arg)
tmp_file->hdr_crc = file->hdr_crc;
}
else
tmp_file->uncompressed_size = tmp_file->write_size;
tmp_file->uncompressed_size = tmp_file->uncompressed_size;

/* Copy header metadata from old map into a new one */
tmp_file->n_headers = file->n_headers;
Expand Down
13 changes: 8 additions & 5 deletions src/pg_probackup.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,11 @@ typedef enum ShowFormat
#define BYTES_INVALID (-1) /* file didn`t changed since previous backup, DELTA backup do not rely on it */
#define FILE_NOT_FOUND (-2) /* file disappeared during backup */
#define BLOCKNUM_INVALID (-1)
#define PROGRAM_VERSION "2.5.8"
#define PROGRAM_VERSION "2.5.9"

/* update when remote agent API or behaviour changes */
#define AGENT_PROTOCOL_VERSION 20501
#define AGENT_PROTOCOL_VERSION_STR "2.5.1"
#define AGENT_PROTOCOL_VERSION 20509
#define AGENT_PROTOCOL_VERSION_STR "2.5.9"

/* update only when changing storage format */
#define STORAGE_FORMAT_VERSION "2.4.4"
Expand Down Expand Up @@ -1077,6 +1077,7 @@ extern void fio_pgFileDelete(pgFile *file, const char *full_path);
extern void pgFileFree(void *file);

extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok);
extern pg_crc32 pgFileGetCRCForTruncated(const char *file_path, bool use_crc32c, int64_t real_size);
extern pg_crc32 pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok);

extern int pgFileMapComparePath(const void *f1, const void *f2);
Expand Down Expand Up @@ -1240,9 +1241,11 @@ extern int fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pg
XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version,
bool use_pagemap, BlockNumber *err_blknum, char **errormsg);
/* return codes for fio_send_pages */
extern int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, char **errormsg);
extern int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
extern int fio_send_file_gz(const char *from_fullpath, FILE* out, char **errormsg);
extern int fio_send_file(const char *from_fullpath, FILE* out, bool cut_zero_tail,
pgFile *file, char **errormsg);
extern int fio_send_file_local(const char *from_fullpath, FILE* out, bool cut_zero_tail,
pgFile *file, char **errormsg);

extern void fio_list_dir(parray *files, const char *root, bool exclude, bool follow_symlink,
bool add_root, bool backup_logs, bool skip_hidden, int external_dir_num);
Expand Down
Loading