2 Bacula® - The Network Backup Solution
4 Copyright (C) 2004-2009 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
31 * Kern Sibbald, March 2004
39 /* Forward referenced subroutines */
40 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
41 static bool open_data_spool_file(DCR *dcr);
42 static bool close_data_spool_file(DCR *dcr);
43 static bool despool_data(DCR *dcr, bool commit);
44 static int read_block_from_spool_file(DCR *dcr);
45 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
47 static bool write_spool_header(DCR *dcr);
48 static bool write_spool_data(DCR *dcr);
50 struct spool_stats_t {
51 uint32_t data_jobs; /* current jobs spooling data */
53 uint32_t total_data_jobs; /* total jobs to have spooled data */
54 uint32_t total_attr_jobs;
55 int64_t max_data_size; /* max data size */
56 int64_t max_attr_size;
57 int64_t data_size; /* current data size (all jobs running) */
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 spool_stats_t spool_stats;
65 * Header for data spool record */
67 int32_t FirstIndex; /* FirstIndex for buffer */
68 int32_t LastIndex; /* LastIndex for buffer */
69 uint32_t len; /* length of next buffer */
78 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
80 char ed1[30], ed2[30];
81 POOL_MEM msg(PM_MESSAGE);
84 len = Mmsg(msg, _("Spooling statistics:\n"));
86 if (spool_stats.data_jobs || spool_stats.max_data_size) {
87 len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
88 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
89 spool_stats.total_data_jobs,
90 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
92 sendit(msg.c_str(), len, arg);
94 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
95 len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
96 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
97 spool_stats.total_attr_jobs,
98 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
100 sendit(msg.c_str(), len, arg);
104 bool begin_data_spool(DCR *dcr)
107 if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
108 Dmsg0(100, "Turning on data spooling\n");
109 dcr->spool_data = true;
110 stat = open_data_spool_file(dcr);
112 dcr->spooling = true;
113 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
115 spool_stats.data_jobs++;
122 bool discard_data_spool(DCR *dcr)
125 Dmsg0(100, "Data spooling discarded\n");
126 return close_data_spool_file(dcr);
131 bool commit_data_spool(DCR *dcr)
136 Dmsg0(100, "Committing spooled data\n");
137 stat = despool_data(dcr, true /*commit*/);
139 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
140 close_data_spool_file(dcr);
143 return close_data_spool_file(dcr);
148 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
151 if (dcr->dev->device->spool_directory) {
152 dir = dcr->dev->device->spool_directory;
154 dir = working_directory;
156 Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
157 dcr->jcr->Job, dcr->device->hdr.name);
161 static bool open_data_spool_file(DCR *dcr)
163 POOLMEM *name = get_pool_memory(PM_MESSAGE);
166 make_unique_data_spool_filename(dcr, &name);
167 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
168 dcr->spool_fd = spool_fd;
169 dcr->jcr->spool_attributes = true;
172 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
174 free_pool_memory(name);
177 Dmsg1(100, "Created spool file: %s\n", name);
178 free_pool_memory(name);
182 static bool close_data_spool_file(DCR *dcr)
184 POOLMEM *name = get_pool_memory(PM_MESSAGE);
187 spool_stats.data_jobs--;
188 spool_stats.total_data_jobs++;
189 if (spool_stats.data_size < dcr->job_spool_size) {
190 spool_stats.data_size = 0;
192 spool_stats.data_size -= dcr->job_spool_size;
194 dcr->job_spool_size = 0;
197 make_unique_data_spool_filename(dcr, &name);
198 close(dcr->spool_fd);
200 dcr->spooling = false;
202 Dmsg1(100, "Deleted spool file: %s\n", name);
203 free_pool_memory(name);
207 static const char *spool_name = "*spool*";
210 * NB! This routine locks the device, but if committing will
211 * not unlock it. If not committing, it will be unlocked.
213 static bool despool_data(DCR *dcr, bool commit)
223 Dmsg0(100, "Despooling data\n");
225 * Commit means that the job is done, so we commit, otherwise, we
226 * are despooling because of user spool size max or some error
227 * (e.g. filesystem full).
230 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
231 jcr->dcr->VolumeName,
232 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
233 set_jcr_job_status(jcr, JS_DataCommitting);
235 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
236 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237 set_jcr_job_status(jcr, JS_DataDespooling);
239 set_jcr_job_status(jcr, JS_DataDespooling);
240 dir_send_job_status(jcr);
241 dcr->despool_wait = true;
242 dcr->spooling = false;
244 * We work with device blocked, but not locked so that
245 * other threads -- e.g. reservations can lock the device
248 dcr->dblock(BST_DESPOOLING);
249 dcr->despool_wait = false;
250 dcr->despooling = true;
253 * This is really quite kludgy and should be fixed some time.
254 * We create a dev structure to read from the spool file
257 rdev = (DEVICE *)malloc(sizeof(DEVICE));
258 memset(rdev, 0, sizeof(DEVICE));
259 rdev->dev_name = get_memory(strlen(spool_name)+1);
260 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
261 rdev->errmsg = get_pool_memory(PM_EMSG);
263 rdev->max_block_size = dcr->dev->max_block_size;
264 rdev->min_block_size = dcr->dev->min_block_size;
265 rdev->device = dcr->dev->device;
266 rdcr = new_dcr(jcr, NULL, rdev);
267 rdcr->spool_fd = dcr->spool_fd;
268 block = dcr->block; /* save block */
269 dcr->block = rdcr->block; /* make read and write block the same */
271 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
272 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
274 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
275 posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
278 /* Add run time, to get current wait time */
279 int32_t despool_start = time(NULL) - jcr->run_time;
281 set_new_file_parameters(dcr);
284 if (job_canceled(jcr)) {
288 stat = read_block_from_spool_file(rdcr);
289 if (stat == RB_EOT) {
291 } else if (stat == RB_ERROR) {
295 ok = write_block_to_device(dcr);
297 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
298 dcr->dev->print_name(), dcr->dev->bstrerror());
299 Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
300 dcr->dev->print_name(), dcr->dev->bstrerror());
302 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
305 if (!dir_create_jobmedia_record(dcr)) {
306 Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
307 dcr->VolCatInfo.VolCatName, jcr->Job);
309 /* Set new file/block parameters for current dcr */
310 set_new_file_parameters(dcr);
313 * Subtracting run_time give us elapsed time - wait_time since
314 * we started despooling. Note, don't use time_t as it is 32 or 64
315 * bits depending on the OS and doesn't edit with %d
317 int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
319 if (despool_elapsed <= 0) {
323 Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
324 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
325 edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
327 dcr->block = block; /* reset block */
329 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
330 if (ftruncate(rdcr->spool_fd, 0) != 0) {
332 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
334 /* Note, try continuing despite ftruncate problem */
338 if (spool_stats.data_size < dcr->job_spool_size) {
339 spool_stats.data_size = 0;
341 spool_stats.data_size -= dcr->job_spool_size;
344 P(dcr->dev->spool_mutex);
345 dcr->dev->spool_size -= dcr->job_spool_size;
346 dcr->job_spool_size = 0; /* zap size in input dcr */
347 V(dcr->dev->spool_mutex);
348 free_memory(rdev->dev_name);
349 free_pool_memory(rdev->errmsg);
350 /* Be careful to NULL the jcr and free rdev after free_dcr() */
355 dcr->spooling = true; /* turn on spooling again */
356 dcr->despooling = false;
359 * We are done, so unblock the device, but if we have done a
360 * commit, leave it locked so that the job cleanup does not
361 * need to wait to release the device (no re-acquire of the lock).
364 unblock_device(dcr->dev);
365 /* If doing a commit, leave the device locked -- unlocked in release_device() */
369 set_jcr_job_status(jcr, JS_Running);
370 dir_send_job_status(jcr);
375 * Read a block from the spool file
377 * Returns RB_OK on success
378 * RB_EOT when file done
381 static int read_block_from_spool_file(DCR *dcr)
386 DEV_BLOCK *block = dcr->block;
389 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
391 Dmsg0(100, "EOT on spool read.\n");
393 } else if (stat != (ssize_t)rlen) {
396 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
399 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
400 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
405 if (rlen > block->buf_len) {
406 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
407 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
410 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
411 if (stat != (ssize_t)rlen) {
412 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
413 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
416 /* Setup write pointers */
417 block->binbuf = rlen;
418 block->bufp = block->buf + block->binbuf;
419 block->FirstIndex = hdr.FirstIndex;
420 block->LastIndex = hdr.LastIndex;
421 block->VolSessionId = dcr->jcr->VolSessionId;
422 block->VolSessionTime = dcr->jcr->VolSessionTime;
423 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
428 * Write a block to the spool file
430 * Returns: true on success or EOT
431 * false on hard error
433 bool write_block_to_spool_file(DCR *dcr)
435 uint32_t wlen, hlen; /* length to write */
436 bool despool = false;
437 DEV_BLOCK *block = dcr->block;
439 if (job_canceled(dcr->jcr)) {
442 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
443 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
447 hlen = sizeof(spool_hdr);
448 wlen = block->binbuf;
449 P(dcr->dev->spool_mutex);
450 dcr->job_spool_size += hlen + wlen;
451 dcr->dev->spool_size += hlen + wlen;
452 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
453 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
456 V(dcr->dev->spool_mutex);
458 spool_stats.data_size += hlen + wlen;
459 if (spool_stats.data_size > spool_stats.max_data_size) {
460 spool_stats.max_data_size = spool_stats.data_size;
465 char ec1[30], ec2[30], ec3[30], ec4[30];
466 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
467 "max_job_size=%s job_size=%s\n",
468 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
469 edit_uint64_with_commas(dcr->job_spool_size, ec2),
470 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
471 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
473 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
474 if (!despool_data(dcr, false)) {
475 Pmsg0(000, _("Bad return from despool in write_block.\n"));
478 /* Despooling cleared these variables so reset them */
479 P(dcr->dev->spool_mutex);
480 dcr->job_spool_size += hlen + wlen;
481 dcr->dev->spool_size += hlen + wlen;
482 V(dcr->dev->spool_mutex);
483 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
487 if (!write_spool_header(dcr)) {
490 if (!write_spool_data(dcr)) {
494 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
499 static bool write_spool_header(DCR *dcr)
503 DEV_BLOCK *block = dcr->block;
505 hdr.FirstIndex = block->FirstIndex;
506 hdr.LastIndex = block->LastIndex;
507 hdr.len = block->binbuf;
510 for (int retry=0; retry<=1; retry++) {
511 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
514 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
517 if (stat != (ssize_t)sizeof(hdr)) {
518 /* If we wrote something, truncate it, then despool */
520 #if defined(HAVE_WIN32)
521 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
523 boffset_t pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
525 if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
527 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
529 /* Note, try continuing despite ftruncate problem */
532 if (!despool_data(dcr, false)) {
533 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
536 continue; /* try again */
540 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
544 static bool write_spool_data(DCR *dcr)
547 DEV_BLOCK *block = dcr->block;
550 for (int retry=0; retry<=1; retry++) {
551 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
554 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
557 if (stat != (ssize_t)block->binbuf) {
559 * If we wrote something, truncate it and the header, then despool
562 #if defined(HAVE_WIN32)
563 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
565 boffset_t pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
567 if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
569 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
571 /* Note, try continuing despite ftruncate problem */
574 if (!despool_data(dcr, false)) {
575 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
578 if (!write_spool_header(dcr)) {
581 continue; /* try again */
585 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
591 bool are_attributes_spooled(JCR *jcr)
593 return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
597 * Create spool file for attributes.
598 * This is done by "attaching" to the bsock, and when
599 * it is called, the output is written to a file.
600 * The actual spooling is turned on and off in
601 * append.c only during writing of the attributes.
603 bool begin_attribute_spool(JCR *jcr)
605 if (!jcr->no_attributes && jcr->spool_attributes) {
606 return open_attr_spool_file(jcr, jcr->dir_bsock);
611 bool discard_attribute_spool(JCR *jcr)
613 if (are_attributes_spooled(jcr)) {
614 return close_attr_spool_file(jcr, jcr->dir_bsock);
619 static void update_attr_spool_size(ssize_t size)
623 if ((spool_stats.attr_size - size) > 0) {
624 spool_stats.attr_size -= size;
626 spool_stats.attr_size = 0;
632 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
634 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
638 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
640 /* send full spool file name */
641 POOLMEM *name = get_pool_memory(PM_MESSAGE);
642 make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
644 jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n",
646 free_pool_memory(name);
648 if (jcr->dir_bsock->recv() <= 0) {
649 Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
653 if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
659 bool commit_attribute_spool(JCR *jcr)
665 Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
666 (utime_t)time(NULL)));
667 if (are_attributes_spooled(jcr)) {
668 if (fseeko(jcr->dir_bsock->m_spool_fd, 0, SEEK_END) != 0) {
670 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
674 size = ftello(jcr->dir_bsock->m_spool_fd);
677 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
682 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
683 spool_stats.max_attr_size = spool_stats.attr_size + size;
685 spool_stats.attr_size += size;
687 set_jcr_job_status(jcr, JS_AttrDespooling);
688 dir_send_job_status(jcr);
689 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
690 edit_uint64_with_commas(size, ec1));
692 if (!blast_attr_spool_file(jcr, size)) {
693 /* Can't read spool file from director side,
694 * send content over network.
696 jcr->dir_bsock->despool(update_attr_spool_size, size);
698 return close_attr_spool_file(jcr, jcr->dir_bsock);
703 close_attr_spool_file(jcr, jcr->dir_bsock);
707 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
709 POOLMEM *name = get_pool_memory(PM_MESSAGE);
711 make_unique_spool_filename(jcr, &name, bs->m_fd);
712 bs->m_spool_fd = fopen(name, "w+b");
713 if (!bs->m_spool_fd) {
715 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
717 free_pool_memory(name);
721 spool_stats.attr_jobs++;
723 free_pool_memory(name);
727 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
733 Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
734 (utime_t)time(NULL)));
735 if (!bs->m_spool_fd) {
738 name = get_pool_memory(PM_MESSAGE);
740 spool_stats.attr_jobs--;
741 spool_stats.total_attr_jobs++;
743 make_unique_spool_filename(jcr, &name, bs->m_fd);
744 fclose(bs->m_spool_fd);
746 free_pool_memory(name);
747 bs->m_spool_fd = NULL;
748 bs->clear_spooling();