/*
- * Spooling code
- *
- * Kern Sibbald, March 2004
- *
- * Version $Id$
- */
-/*
- Copyright (C) 2000-2004 Kern Sibbald and John Walker
+ Bacula® - The Network Backup Solution
+
+ Copyright (C) 2004-2007 Free Software Foundation Europe e.V.
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License as
- published by the Free Software Foundation; either version 2 of
- the License, or (at your option) any later version.
+ The main author of Bacula is Kern Sibbald, with contributions from
+ many others, a complete list can be found in the file AUTHORS.
+ This program is Free Software; you can redistribute it and/or
+ modify it under the terms of version two of the GNU General Public
+ License as published by the Free Software Foundation plus additions
+ that are listed in the file LICENSE.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
- You should have received a copy of the GNU General Public
- License along with this program; if not, write to the Free
- Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
- MA 02111-1307, USA.
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+ Bacula® is a registered trademark of John Walker.
+ The licensor of Bacula is the Free Software Foundation Europe
+ (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
+ Switzerland, email:ftf@fsfeurope.org.
+*/
+/*
+ * Spooling code
+ *
+ * Kern Sibbald, March 2004
+ *
+ * Version $Id$
*/
#include "bacula.h"
#include "stored.h"
/* Forward referenced subroutines */
-static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
-static int open_data_spool_file(JCR *jcr);
-static int close_data_spool_file(JCR *jcr);
-static bool despool_data(DCR *dcr);
+static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
+static bool open_data_spool_file(DCR *dcr);
+static bool close_data_spool_file(DCR *dcr);
+static bool despool_data(DCR *dcr, bool commit);
+static int read_block_from_spool_file(DCR *dcr);
+static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
+static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
+static bool write_spool_header(DCR *dcr);
+static bool write_spool_data(DCR *dcr);
+
+struct spool_stats_t {
+ uint32_t data_jobs; /* current jobs spooling data */
+ uint32_t attr_jobs;
+ uint32_t total_data_jobs; /* total jobs to have spooled data */
+ uint32_t total_attr_jobs;
+ int64_t max_data_size; /* max data size */
+ int64_t max_attr_size;
+ int64_t data_size; /* current data size (all jobs running) */
+ int64_t attr_size;
+};
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+spool_stats_t spool_stats;
+
+/*
+ * Header for data spool record */
+struct spool_hdr {
+ int32_t FirstIndex; /* FirstIndex for buffer */
+ int32_t LastIndex; /* LastIndex for buffer */
+ uint32_t len; /* length of next buffer */
+};
+
+enum {
+ RB_EOT = 1,
+ RB_ERROR,
+ RB_OK
+};
+
+void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
+{
+ char ed1[30], ed2[30];
+ POOL_MEM msg(PM_MESSAGE);
+ int len;
+
+ if (spool_stats.data_jobs || spool_stats.max_data_size) {
+ len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
+ spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
+ spool_stats.total_data_jobs,
+ edit_uint64_with_commas(spool_stats.max_data_size, ed2));
+ sendit(msg.c_str(), len, arg);
+ }
+ if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
+ len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
+ spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
+ spool_stats.total_attr_jobs,
+ edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
+
+ sendit(msg.c_str(), len, arg);
+ }
+}
-int begin_data_spool(JCR *jcr)
+bool begin_data_spool(DCR *dcr)
{
- if (jcr->dcr->spool_data) {
- return open_data_spool_file(jcr);
+ bool stat = true;
+ if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
+ Dmsg0(100, "Turning on data spooling\n");
+ dcr->spool_data = true;
+ stat = open_data_spool_file(dcr);
+ if (stat) {
+ dcr->spooling = true;
+ Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
+ P(mutex);
+ spool_stats.data_jobs++;
+ V(mutex);
+ }
}
- return 1;
+ return stat;
}
-int discard_data_spool(JCR *jcr)
+bool discard_data_spool(DCR *dcr)
{
- if (jcr->dcr->spool_data && jcr->dcr->spool_fd >= 0) {
- return close_data_spool_file(jcr);
+ if (dcr->spooling) {
+ Dmsg0(100, "Data spooling discarded\n");
+ return close_data_spool_file(dcr);
}
- return 1;
+ return true;
}
-int commit_data_spool(JCR *jcr)
+bool commit_data_spool(DCR *dcr)
{
bool stat;
- if (jcr->dcr->spool_data && jcr->dcr->spool_fd >= 0) {
- lock_device(jcr->dcr->dev);
- stat = despool_data(jcr->dcr);
- unlock_device(jcr->dcr->dev);
+
+ if (dcr->spooling) {
+ Dmsg0(100, "Committing spooled data\n");
+ stat = despool_data(dcr, true /*commit*/);
if (!stat) {
- close_data_spool_file(jcr);
- return 0;
+ Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
+ close_data_spool_file(dcr);
+ return false;
}
- return close_data_spool_file(jcr);
+ return close_data_spool_file(dcr);
}
- return 1;
+ return true;
}
-static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
+static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
{
- Mmsg(name, "%s/%s.data.spool.%s.%s", working_directory, my_name,
- jcr->Job, jcr->device->hdr.name);
+ const char *dir;
+ if (dcr->dev->device->spool_directory) {
+ dir = dcr->dev->device->spool_directory;
+ } else {
+ dir = working_directory;
+ }
+ Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
+ dcr->jcr->Job, dcr->device->hdr.name);
}
-static int open_data_spool_file(JCR *jcr)
+static bool open_data_spool_file(DCR *dcr)
{
POOLMEM *name = get_pool_memory(PM_MESSAGE);
int spool_fd;
- make_unique_data_spool_filename(jcr, &name);
+ make_unique_data_spool_filename(dcr, &name);
if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
- jcr->dcr->spool_fd = spool_fd;
- jcr->spool_attributes = true;
+ dcr->spool_fd = spool_fd;
+ dcr->jcr->spool_attributes = true;
} else {
- Jmsg(jcr, M_ERROR, 0, "open data spool file %s failed: ERR=%s\n", name, strerror(errno));
+ berrno be;
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
+ be.strerror());
free_pool_memory(name);
- return 0;
- }
- free_pool_memory(name);
- return 1;
+ return false;
+ }
+ Dmsg1(100, "Created spool file: %s\n", name);
+ free_pool_memory(name);
+ return true;
}
-static int close_data_spool_file(JCR *jcr)
+static bool close_data_spool_file(DCR *dcr)
{
- POOLMEM *name = get_pool_memory(PM_MESSAGE);
+ POOLMEM *name = get_pool_memory(PM_MESSAGE);
+
+ P(mutex);
+ spool_stats.data_jobs--;
+ spool_stats.total_data_jobs++;
+ if (spool_stats.data_size < dcr->job_spool_size) {
+ spool_stats.data_size = 0;
+ } else {
+ spool_stats.data_size -= dcr->job_spool_size;
+ }
+ dcr->job_spool_size = 0;
+ V(mutex);
- make_unique_data_spool_filename(jcr, &name);
- close(jcr->dcr->spool_fd);
- jcr->dcr->spool_fd = -1;
- unlink(name);
- free_pool_memory(name);
- return 1;
+ make_unique_data_spool_filename(dcr, &name);
+ close(dcr->spool_fd);
+ dcr->spool_fd = -1;
+ dcr->spooling = false;
+ unlink(name);
+ Dmsg1(100, "Deleted spool file: %s\n", name);
+ free_pool_memory(name);
+ return true;
}
-static bool despool_data(DCR *dcr)
+static const char *spool_name = "*spool*";
+
+/*
+ * NB! This routine locks the device, but if committing will
+ * not unlock it. If not committing, it will be unlocked.
+ */
+static bool despool_data(DCR *dcr, bool commit)
{
- DEVICE *sdev;
- DCR *sdcr;
- dcr->spooling = false;
+ DEVICE *rdev;
+ DCR *rdcr;
bool ok = true;
- DEV_BLOCK *block = dcr->block;
+ DEV_BLOCK *block;
JCR *jcr = dcr->jcr;
+ int stat;
+ char ec1[50];
+
+ Dmsg0(100, "Despooling data\n");
+ /*
+ * Commit means that the job is done, so we commit, otherwise, we
+ * are despooling because of user spool size max or some error
+ * (e.g. filesystem full).
+ */
+ if (commit) {
+ Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
+ jcr->dcr->VolumeName,
+ edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
+ } else {
+ Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
+ edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
+ }
+ dcr->despool_wait = true;
+ dcr->spooling = false;
+ dcr->dev->r_dlock();
+ dcr->despool_wait = false;
+ dcr->despooling = true;
+ dcr->dev_locked = true;
+
+ /*
+ * This is really quite kludgy and should be fixed some time.
+ * We create a dev structure to read from the spool file
+ * in rdev and rdcr.
+ */
+ rdev = (DEVICE *)malloc(sizeof(DEVICE));
+ memset(rdev, 0, sizeof(DEVICE));
+ rdev->dev_name = get_memory(strlen(spool_name)+1);
+ bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
+ rdev->errmsg = get_pool_memory(PM_EMSG);
+ *rdev->errmsg = 0;
+ rdev->max_block_size = dcr->dev->max_block_size;
+ rdev->min_block_size = dcr->dev->min_block_size;
+ rdev->device = dcr->dev->device;
+ rdcr = new_dcr(jcr, rdev);
+ rdcr->spool_fd = dcr->spool_fd;
+ rdcr->jcr = jcr; /* set a valid jcr */
+ block = dcr->block; /* save block */
+ dcr->block = rdcr->block; /* make read and write block the same */
+
+ Dmsg1(800, "read/write block size = %d\n", block->buf_len);
+ lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
+
+ /* Add run time, to get current wait time */
+ time_t despool_start = time(NULL) - jcr->run_time;
- /* Set up a dev structure to read */
- sdev = (DEVICE *)malloc(sizeof(DEVICE));
- memset(sdev, 0, sizeof(DEVICE));
- sdev->fd = dcr->spool_fd;
- lseek(sdev->fd, 0, SEEK_SET); /* rewind */
- sdcr = new_dcr(jcr, sdev);
for ( ; ok; ) {
if (job_canceled(jcr)) {
- ok = false;
- break;
+ ok = false;
+ break;
}
- if (!read_block_from_dev(jcr, sdev, block, CHECK_BLOCK_NUMBERS)) {
- if (dev_state(sdev, ST_EOT)) {
- break;
- }
- ok = false;
- break;
+ stat = read_block_from_spool_file(rdcr);
+ if (stat == RB_EOT) {
+ break;
+ } else if (stat == RB_ERROR) {
+ ok = false;
+ break;
}
- if (!write_block_to_dev(dcr, block)) {
- ok = false;
- break;
+ ok = write_block_to_device(dcr);
+ if (!ok) {
+ Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
+ dcr->dev->print_name(), dcr->dev->bstrerror());
}
+ Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
+ }
+
+ /* Subtracting run_time give us elapsed time - wait_time since we started despooling */
+ time_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
+
+ if (despool_elapsed <= 0) {
+ despool_elapsed = 1;
}
- lseek(sdev->fd, 0, SEEK_SET); /* rewind */
- if (ftruncate(sdev->fd, 0) != 0) {
+
+ Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
+ despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
+ edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
+
+ dcr->block = block; /* reset block */
+
+ lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
+ if (ftruncate(rdcr->spool_fd, 0) != 0) {
+ berrno be;
+ Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
+ be.strerror());
+ Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
ok = false;
}
+
+ P(mutex);
+ if (spool_stats.data_size < dcr->job_spool_size) {
+ spool_stats.data_size = 0;
+ } else {
+ spool_stats.data_size -= dcr->job_spool_size;
+ }
+ V(mutex);
+ P(dcr->dev->spool_mutex);
+ dcr->dev->spool_size -= dcr->job_spool_size;
+ dcr->job_spool_size = 0; /* zap size in input dcr */
+ V(dcr->dev->spool_mutex);
+ free_memory(rdev->dev_name);
+ free_pool_memory(rdev->errmsg);
+ /* Be careful to NULL the jcr and free rdev after free_dcr() */
+ rdcr->jcr = NULL;
+ rdcr->dev = NULL;
+ free_dcr(rdcr);
+ free(rdev);
+ dcr->spooling = true; /* turn on spooling again */
+ dcr->despooling = false;
+ /* If doing a commit, leave the device locked -- unlocked in release_device() */
+ if (!commit) {
+ dcr->dev_locked = false;
+ dcr->dev->dunlock();
+ }
return ok;
}
+/*
+ * Read a block from the spool file
+ *
+ * Returns RB_OK on success
+ * RB_EOT when file done
+ * RB_ERROR on error
+ */
+static int read_block_from_spool_file(DCR *dcr)
+{
+ uint32_t rlen;
+ ssize_t stat;
+ spool_hdr hdr;
+ DEV_BLOCK *block = dcr->block;
+
+ rlen = sizeof(hdr);
+ stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
+ if (stat == 0) {
+ Dmsg0(100, "EOT on spool read.\n");
+ return RB_EOT;
+ } else if (stat != (ssize_t)rlen) {
+ if (stat == -1) {
+ berrno be;
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
+ be.strerror());
+ } else {
+ Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
+ Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
+ }
+ return RB_ERROR;
+ }
+ rlen = hdr.len;
+ if (rlen > block->buf_len) {
+ Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
+ Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
+ return RB_ERROR;
+ }
+ stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
+ if (stat != (ssize_t)rlen) {
+ Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
+ Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
+ return RB_ERROR;
+ }
+ /* Setup write pointers */
+ block->binbuf = rlen;
+ block->bufp = block->buf + block->binbuf;
+ block->FirstIndex = hdr.FirstIndex;
+ block->LastIndex = hdr.LastIndex;
+ block->VolSessionId = dcr->jcr->VolSessionId;
+ block->VolSessionTime = dcr->jcr->VolSessionTime;
+ Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
+ return RB_OK;
+}
+
/*
* Write a block to the spool file
*
* Returns: true on success or EOT
- * false on hard error
+ * false on hard error
*/
-bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
+bool write_block_to_spool_file(DCR *dcr)
{
- ssize_t stat = 0;
- uint32_t wlen; /* length to write */
- DEVICE *dev = dcr->dev;
- int retry = 0;
+ uint32_t wlen, hlen; /* length to write */
+ bool despool = false;
+ DEV_BLOCK *block = dcr->block;
ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
+ if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
+ return true;
+ }
+ hlen = sizeof(spool_hdr);
wlen = block->binbuf;
- if (wlen <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
- Dmsg0(100, "return write_block_to_dev no data to write\n");
- return true;
+ P(dcr->dev->spool_mutex);
+ dcr->job_spool_size += hlen + wlen;
+ dcr->dev->spool_size += hlen + wlen;
+ if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
+ (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
+ despool = true;
}
- /*
- * Clear to the end of the buffer if it is not full,
- * and on tape devices, apply min and fixed blocking.
- */
- if (wlen != block->buf_len) {
- uint32_t blen; /* current buffer length */
-
- Dmsg2(200, "binbuf=%d buf_len=%d\n", block->binbuf, block->buf_len);
- blen = wlen;
-
- /* Adjust write size to min/max for tapes only */
- if (dev->state & ST_TAPE) {
- if (wlen < dev->min_block_size) {
- wlen = ((dev->min_block_size + TAPE_BSIZE - 1) / TAPE_BSIZE) * TAPE_BSIZE;
- }
- /* check for fixed block size */
- if (dev->min_block_size == dev->max_block_size) {
- wlen = block->buf_len; /* fixed block size already rounded */
- }
- }
- if (wlen-blen > 0) {
- memset(block->bufp, 0, wlen-blen); /* clear garbage */
+ V(dcr->dev->spool_mutex);
+ P(mutex);
+ spool_stats.data_size += hlen + wlen;
+ if (spool_stats.data_size > spool_stats.max_data_size) {
+ spool_stats.max_data_size = spool_stats.data_size;
+ }
+ V(mutex);
+ if (despool) {
+#ifdef xDEBUG
+ char ec1[30], ec2[30], ec3[30], ec4[30];
+ Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
+ "max_job_size=%s job_size=%s\n",
+ edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
+ edit_uint64_with_commas(dcr->job_spool_size, ec2),
+ edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
+ edit_uint64_with_commas(dcr->dev->spool_size, ec4));
+#endif
+ Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
+ if (!despool_data(dcr, false)) {
+ Pmsg0(000, _("Bad return from despool in write_block.\n"));
+ return false;
}
- }
+ /* Despooling cleared these variables so reset them */
+ P(dcr->dev->spool_mutex);
+ dcr->job_spool_size += hlen + wlen;
+ dcr->dev->spool_size += hlen + wlen;
+ V(dcr->dev->spool_mutex);
+ Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
+ }
- ser_block_header(block);
- Dmsg1(300, "Write block of %u bytes\n", wlen);
-write_again:
- stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
- if (stat != (ssize_t)wlen) {
- if (!despool_data(dcr)) {
- return false;
- }
- if (retry++ > 1) {
- return false;
- }
- goto write_again;
+ if (!write_spool_header(dcr)) {
+ return false;
+ }
+ if (!write_spool_data(dcr)) {
+ return false;
}
+ Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
empty_block(block);
return true;
}
+static bool write_spool_header(DCR *dcr)
+{
+ spool_hdr hdr;
+ ssize_t stat;
+ DEV_BLOCK *block = dcr->block;
+
+ hdr.FirstIndex = block->FirstIndex;
+ hdr.LastIndex = block->LastIndex;
+ hdr.len = block->binbuf;
+
+ /* Write header */
+ for (int retry=0; retry<=1; retry++) {
+ stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
+ if (stat == -1) {
+ berrno be;
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
+ be.strerror());
+ }
+ if (stat != (ssize_t)sizeof(hdr)) {
+ /* If we wrote something, truncate it, then despool */
+ if (stat != -1) {
+#if defined(HAVE_WIN32)
+ boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
+#else
+ boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
+#endif
+ if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
+ berrno be;
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
+ be.strerror());
+ return false;
+ }
+ }
+ if (!despool_data(dcr, false)) {
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
+ return false;
+ }
+ continue; /* try again */
+ }
+ return true;
+ }
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
+ return false;
+}
+
+static bool write_spool_data(DCR *dcr)
+{
+ ssize_t stat;
+ DEV_BLOCK *block = dcr->block;
+
+ /* Write data */
+ for (int retry=0; retry<=1; retry++) {
+ stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
+ if (stat == -1) {
+ berrno be;
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
+ be.strerror());
+ }
+ if (stat != (ssize_t)block->binbuf) {
+ /*
+ * If we wrote something, truncate it and the header, then despool
+ */
+ if (stat != -1) {
+#if defined(HAVE_WIN32)
+ boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
+#else
+ boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
+#endif
+ if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
+ berrno be;
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
+ be.strerror());
+ return false;
+ }
+ }
+ if (!despool_data(dcr, false)) {
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
+ return false;
+ }
+ if (!write_spool_header(dcr)) {
+ return false;
+ }
+ continue; /* try again */
+ }
+ return true;
+ }
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
+ return false;
+}
+
bool are_attributes_spooled(JCR *jcr)
return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
}
-int begin_attribute_spool(JCR *jcr)
+/*
+ * Create spool file for attributes.
+ * This is done by "attaching" to the bsock, and when
+ * it is called, the output is written to a file.
+ * The actual spooling is turned on and off in
+ * append.c only during writing of the attributes.
+ */
+bool begin_attribute_spool(JCR *jcr)
{
if (!jcr->no_attributes && jcr->spool_attributes) {
- return open_spool_file(jcr, jcr->dir_bsock);
+ return open_attr_spool_file(jcr, jcr->dir_bsock);
}
- return 1;
+ return true;
}
-int discard_attribute_spool(JCR *jcr)
+bool discard_attribute_spool(JCR *jcr)
{
if (are_attributes_spooled(jcr)) {
- return close_spool_file(jcr, jcr->dir_bsock);
+ return close_attr_spool_file(jcr, jcr->dir_bsock);
+ }
+ return true;
+}
+
+static void update_attr_spool_size(ssize_t size)
+{
+ P(mutex);
+ if (size > 0) {
+ if ((spool_stats.attr_size - size) > 0) {
+ spool_stats.attr_size -= size;
+ } else {
+ spool_stats.attr_size = 0;
+ }
}
- return 1;
+ V(mutex);
}
-int commit_attribute_spool(JCR *jcr)
+bool commit_attribute_spool(JCR *jcr)
{
+ off_t size;
+ char ec1[30];
+
if (are_attributes_spooled(jcr)) {
- bnet_despool_to_bsock(jcr->dir_bsock);
- return close_spool_file(jcr, jcr->dir_bsock);
+ if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
+ berrno be;
+ Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
+ be.strerror());
+ goto bail_out;
+ }
+ size = ftello(jcr->dir_bsock->spool_fd);
+ if (size < 0) {
+ berrno be;
+ Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
+ be.strerror());
+ goto bail_out;
+ }
+ P(mutex);
+ if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
+ spool_stats.max_attr_size = spool_stats.attr_size + size;
+ }
+ spool_stats.attr_size += size;
+ V(mutex);
+ Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
+ edit_uint64_with_commas(size, ec1));
+ jcr->dir_bsock->despool(update_attr_spool_size, size);
+ return close_attr_spool_file(jcr, jcr->dir_bsock);
}
- return 1;
+ return true;
+
+bail_out:
+ close_attr_spool_file(jcr, jcr->dir_bsock);
+ return false;
+}
+
+static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
+{
+ Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
+ jcr->Job, fd);
+}
+
+
+bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
+{
+ POOLMEM *name = get_pool_memory(PM_MESSAGE);
+
+ make_unique_spool_filename(jcr, &name, bs->fd);
+ bs->spool_fd = fopen(name, "w+b");
+ if (!bs->spool_fd) {
+ berrno be;
+ Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
+ be.strerror());
+ free_pool_memory(name);
+ return false;
+ }
+ P(mutex);
+ spool_stats.attr_jobs++;
+ V(mutex);
+ free_pool_memory(name);
+ return true;
+}
+
+bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
+{
+ POOLMEM *name;
+
+ if (!bs->spool_fd) {
+ return true;
+ }
+ name = get_pool_memory(PM_MESSAGE);
+ P(mutex);
+ spool_stats.attr_jobs--;
+ spool_stats.total_attr_jobs++;
+ V(mutex);
+ make_unique_spool_filename(jcr, &name, bs->fd);
+ fclose(bs->spool_fd);
+ unlink(name);
+ free_pool_memory(name);
+ bs->spool_fd = NULL;
+ bs->spool = false;
+ return true;
}