From 4dc995c759e4b9af50bae47455c50d256294bf9d Mon Sep 17 00:00:00 2001 From: Eric Bollengier Date: Tue, 6 Jan 2009 12:54:09 +0000 Subject: [PATCH] ebl Despool attributes directly from the director if attribute spool file is present git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@8324 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/src/dird/catreq.c | 135 +++++++++++++++++++++++++++++++------- bacula/src/dird/getmsg.c | 18 ++++- bacula/src/dird/protos.h | 1 + bacula/src/stored/spool.c | 42 +++++++++--- bacula/technotes-2.5 | 3 + 5 files changed, 166 insertions(+), 33 deletions(-) diff --git a/bacula/src/dird/catreq.c b/bacula/src/dird/catreq.c index 661afd2004..69f9488eec 100644 --- a/bacula/src/dird/catreq.c +++ b/bacula/src/dird/catreq.c @@ -357,13 +357,11 @@ void catalog_request(JCR *jcr, BSOCK *bs) } /* - * Update File Attributes in the catalog with data - * sent by the Storage daemon. Note, we receive the whole - * attribute record, but we select out only the stat packet, - * VolSessionId, VolSessionTime, FileIndex, file type, and - * file name to store in the catalog. + * Note, we receive the whole attribute record, but we select out only the stat + * packet, VolSessionId, VolSessionTime, FileIndex, file type, and file name to + * store in the catalog. */ -void catalog_update(JCR *jcr, BSOCK *bs) +static void update_attribute(JCR *jcr, char *msg, int32_t msglen) { unser_declare; uint32_t VolSessionId, VolSessionTime; @@ -375,19 +373,6 @@ void catalog_update(JCR *jcr, BSOCK *bs) int len; char *fname, *attr; ATTR_DBR *ar = NULL; - POOLMEM *omsg; - - if (job_canceled(jcr) || !jcr->pool->catalog_files) { - goto bail_out; /* user disabled cataloging */ - } - if (!jcr->db) { - omsg = get_memory(bs->msglen+1); - pm_strcpy(omsg, bs->msg); - bs->fsend(_("1994 Invalid Catalog Update: %s"), omsg); - Jmsg1(jcr, M_FATAL, 0, _("Invalid Catalog Update; DB not open: %s"), omsg); - free_memory(omsg); - goto bail_out; - } /* Start transaction allocates jcr->attr and jcr->ar if needed */ db_start_transaction(jcr, jcr->db); /* start transaction if not already open */ @@ -397,7 +382,7 @@ void catalog_update(JCR *jcr, BSOCK *bs) * there may be a cached attr so we cannot yet write into * jcr->attr or jcr->ar */ - p = bs->msg; + p = msg; skip_nonspaces(&p); /* UpdCat */ skip_spaces(&p); skip_nonspaces(&p); /* Job=nnn */ @@ -412,7 +397,7 @@ void catalog_update(JCR *jcr, BSOCK *bs) unser_uint32(data_len); p += unser_length(p); - Dmsg1(400, "UpdCat msg=%s\n", bs->msg); + Dmsg1(400, "UpdCat msg=%s\n", msg); Dmsg5(400, "UpdCat VolSessId=%d VolSessT=%d FI=%d Strm=%d data_len=%d\n", VolSessionId, VolSessionTime, FileIndex, Stream, data_len); @@ -424,9 +409,9 @@ void catalog_update(JCR *jcr, BSOCK *bs) } } /* Any cached attr is flushed so we can reuse jcr->attr and jcr->ar */ - jcr->attr = check_pool_memory_size(jcr->attr, bs->msglen); - memcpy(jcr->attr, bs->msg, bs->msglen); - p = jcr->attr - bs->msg + p; /* point p into jcr->attr */ + jcr->attr = check_pool_memory_size(jcr->attr, msglen); + memcpy(jcr->attr, msg, msglen); + p = jcr->attr - msg + p; /* point p into jcr->attr */ skip_nonspaces(&p); /* skip FileIndex */ skip_spaces(&p); filetype = str_to_int32(p); /* TODO: choose between unserialize and str_to_int32 */ @@ -510,8 +495,110 @@ void catalog_update(JCR *jcr, BSOCK *bs) } } } +} + +/* + * Update File Attributes in the catalog with data + * sent by the Storage daemon. + */ +void catalog_update(JCR *jcr, BSOCK *bs) +{ + POOLMEM *omsg; + + if (job_canceled(jcr) || !jcr->pool->catalog_files) { + goto bail_out; /* user disabled cataloging */ + } + if (!jcr->db) { + omsg = get_memory(bs->msglen+1); + pm_strcpy(omsg, bs->msg); + bs->fsend(_("1994 Invalid Catalog Update: %s"), omsg); + Jmsg1(jcr, M_FATAL, 0, _("Invalid Catalog Update; DB not open: %s"), omsg); + free_memory(omsg); + goto bail_out; + + } + + update_attribute(jcr, bs->msg, bs->msglen); + bail_out: if (job_canceled(jcr)) { cancel_storage_daemon_job(jcr); } } + +/* + * Update File Attributes in the catalog with data read from + * the storage daemon spool file. We receive the filename and + * we try to read it. + */ +bool despool_attributes_from_file(JCR *jcr, const char *file) +{ + bool ret=false; + int32_t pktsiz; + size_t nbytes; + ssize_t last = 0, size = 0; + int count = 0; + int32_t msglen; /* message length */ + POOLMEM *msg = get_pool_memory(PM_MESSAGE); + FILE *spool_fd=NULL; + + Dmsg0(100, "Begin despool_attributes_from_file\n"); + + if (job_canceled(jcr) || !jcr->pool->catalog_files || !jcr->db) { + goto bail_out; /* user disabled cataloging */ + } + + spool_fd = fopen(file, "rb"); + if (!spool_fd) { + Dmsg0(100, "cancel despool_attributes_from_file\n"); + /* send an error message */ + goto bail_out; + } +#if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED) + posix_fadvise(fileno(spool_fd), 0, 0, POSIX_FADV_WILLNEED); +#endif + + while (fread((char *)&pktsiz, 1, sizeof(int32_t), spool_fd) == + sizeof(int32_t)) { + size += sizeof(int32_t); + msglen = ntohl(pktsiz); + if (msglen > 0) { + if (msglen > (int32_t) sizeof_pool_memory(msg)) { + msg = realloc_pool_memory(msg, msglen + 1); + } + nbytes = fread(msg, 1, msglen, spool_fd); + if (nbytes != (size_t) msglen) { + berrno be; + Dmsg2(400, "nbytes=%d msglen=%d\n", nbytes, msglen); + Qmsg1(jcr, M_FATAL, 0, _("fread attr spool error. ERR=%s\n"), + be.bstrerror()); + goto bail_out; + } + size += nbytes; + if ((++count & 0x3F) == 0) { + last = size; + } + } + update_attribute(jcr, msg, msglen); + } + if (ferror(spool_fd)) { + berrno be; + Qmsg1(jcr, M_FATAL, 0, _("fread attr spool error. ERR=%s\n"), + be.bstrerror()); + goto bail_out; + } + ret = true; + +bail_out: + if (spool_fd) { + fclose(spool_fd); + } + + if (job_canceled(jcr)) { + cancel_storage_daemon_job(jcr); + } + + free_pool_memory(msg); + Dmsg1(100, "End despool_attributes_from_file ret=%i\n", ret); + return ret; +} diff --git a/bacula/src/dird/getmsg.c b/bacula/src/dird/getmsg.c index c0237489b4..b28a691091 100644 --- a/bacula/src/dird/getmsg.c +++ b/bacula/src/dird/getmsg.c @@ -135,7 +135,7 @@ int bget_dirmsg(BSOCK *bs) for (;;) { n = bs->recv(); - Dmsg2(300, "bget_dirmsg %d: %s\n", n, bs->msg); + Dmsg2(100, "bget_dirmsg %d: %s\n", n, bs->msg); if (is_bnet_stop(bs)) { return n; /* error or terminate */ @@ -236,6 +236,22 @@ int bget_dirmsg(BSOCK *bs) catalog_update(jcr, bs); continue; } + if (bs->msg[0] == 'B') { /* SD sending file spool attributes */ + Dmsg2(100, "Blast attributes jcr 0x%x: %s", jcr, bs->msg); + char filename[256]; + if (sscanf(bs->msg, "BlastAttr Job=%127s File=%255s", + Job, filename) != 2) { + Jmsg1(jcr, M_ERROR, 0, _("Malformed message: %s\n"), bs->msg); + continue; + } + unbash_spaces(filename); + if (despool_attributes_from_file(jcr, filename)) { + bs->fsend("1000 OK BlastAttr\n"); + } else { + bs->fsend("1990 ERROR BlastAttr\n"); + } + continue; + } if (bs->msg[0] == 'M') { /* Mount request */ Dmsg1(900, "Mount req: %s", bs->msg); mount_request(jcr, bs, msg); diff --git a/bacula/src/dird/protos.h b/bacula/src/dird/protos.h index 9cbdcb1135..7fa5ebfe44 100644 --- a/bacula/src/dird/protos.h +++ b/bacula/src/dird/protos.h @@ -80,6 +80,7 @@ void print_bsr(UAContext *ua, RESTORE_CTX &rx); /* catreq.c */ extern void catalog_request(JCR *jcr, BSOCK *bs); extern void catalog_update(JCR *jcr, BSOCK *bs); +extern bool despool_attributes_from_file(JCR *jcr, const char *file); /* dird_conf.c */ extern const char *level_to_str(int level); diff --git a/bacula/src/stored/spool.c b/bacula/src/stored/spool.c index 7e6246a0fb..9d6300ae6d 100644 --- a/bacula/src/stored/spool.c +++ b/bacula/src/stored/spool.c @@ -622,6 +622,33 @@ static void update_attr_spool_size(ssize_t size) V(mutex); } +static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd) +{ + Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name, + jcr->Job, fd); +} + +static bool blast_attr_spool_file(JCR *jcr, boffset_t size) +{ + /* send full spool file name */ + POOLMEM *name = get_pool_memory(PM_MESSAGE); + make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd); + bash_spaces(name); + jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", + jcr->Job, name); + free_pool_memory(name); + + if (jcr->dir_bsock->recv() <= 0) { + Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n")); + return false; + } + + if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) { + return false; + } + return true; +} + bool commit_attribute_spool(JCR *jcr) { boffset_t size; @@ -654,7 +681,13 @@ bool commit_attribute_spool(JCR *jcr) dir_send_job_status(jcr); Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"), edit_uint64_with_commas(size, ec1)); - jcr->dir_bsock->despool(update_attr_spool_size, size); + + if (!blast_attr_spool_file(jcr, size)) { + /* Can't read spool file from director side, + * send content over network. + */ + jcr->dir_bsock->despool(update_attr_spool_size, size); + } return close_attr_spool_file(jcr, jcr->dir_bsock); } return true; @@ -664,13 +697,6 @@ bail_out: return false; } -static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd) -{ - Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name, - jcr->Job, fd); -} - - bool open_attr_spool_file(JCR *jcr, BSOCK *bs) { POOLMEM *name = get_pool_memory(PM_MESSAGE); diff --git a/bacula/technotes-2.5 b/bacula/technotes-2.5 index 95a6a7d92a..7ba49fdc64 100644 --- a/bacula/technotes-2.5 +++ b/bacula/technotes-2.5 @@ -10,6 +10,9 @@ filepattern (restore with regex in bsr) mixed priorities General: +06Jan09 +ebl Despool attributes directly from the director if attribute + spool file is present Beta Release 2.5.28-b1 05Jan09 kes Fix bat install broken by $DESTDIR change. -- 2.39.5