2 Bacula® - The Network Backup Solution
4 Copyright (C) 2004-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of John Walker.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
31 * Kern Sibbald, March 2004
39 /* Forward referenced subroutines */
40 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
41 static bool open_data_spool_file(DCR *dcr);
42 static bool close_data_spool_file(DCR *dcr);
43 static bool despool_data(DCR *dcr, bool commit);
44 static int read_block_from_spool_file(DCR *dcr);
45 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
47 static bool write_spool_header(DCR *dcr);
48 static bool write_spool_data(DCR *dcr);
50 struct spool_stats_t {
51 uint32_t data_jobs; /* current jobs spooling data */
53 uint32_t total_data_jobs; /* total jobs to have spooled data */
54 uint32_t total_attr_jobs;
55 int64_t max_data_size; /* max data size */
56 int64_t max_attr_size;
57 int64_t data_size; /* current data size (all jobs running) */
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 spool_stats_t spool_stats;
65 * Header for data spool record */
67 int32_t FirstIndex; /* FirstIndex for buffer */
68 int32_t LastIndex; /* LastIndex for buffer */
69 uint32_t len; /* length of next buffer */
78 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
80 char ed1[30], ed2[30];
81 POOL_MEM msg(PM_MESSAGE);
84 len = Mmsg(msg, _("Spooling statistics:\n"));
86 if (spool_stats.data_jobs || spool_stats.max_data_size) {
87 len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
88 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
89 spool_stats.total_data_jobs,
90 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
92 sendit(msg.c_str(), len, arg);
94 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
95 len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
96 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
97 spool_stats.total_attr_jobs,
98 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
100 sendit(msg.c_str(), len, arg);
104 bool begin_data_spool(DCR *dcr)
107 if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
108 Dmsg0(100, "Turning on data spooling\n");
109 dcr->spool_data = true;
110 stat = open_data_spool_file(dcr);
112 dcr->spooling = true;
113 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
115 spool_stats.data_jobs++;
122 bool discard_data_spool(DCR *dcr)
125 Dmsg0(100, "Data spooling discarded\n");
126 return close_data_spool_file(dcr);
131 bool commit_data_spool(DCR *dcr)
136 Dmsg0(100, "Committing spooled data\n");
137 stat = despool_data(dcr, true /*commit*/);
139 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
140 close_data_spool_file(dcr);
143 return close_data_spool_file(dcr);
148 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
151 if (dcr->dev->device->spool_directory) {
152 dir = dcr->dev->device->spool_directory;
154 dir = working_directory;
156 Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
157 dcr->jcr->Job, dcr->device->hdr.name);
161 static bool open_data_spool_file(DCR *dcr)
163 POOLMEM *name = get_pool_memory(PM_MESSAGE);
166 make_unique_data_spool_filename(dcr, &name);
167 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
168 dcr->spool_fd = spool_fd;
169 dcr->jcr->spool_attributes = true;
172 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
174 free_pool_memory(name);
177 Dmsg1(100, "Created spool file: %s\n", name);
178 free_pool_memory(name);
182 static bool close_data_spool_file(DCR *dcr)
184 POOLMEM *name = get_pool_memory(PM_MESSAGE);
187 spool_stats.data_jobs--;
188 spool_stats.total_data_jobs++;
189 if (spool_stats.data_size < dcr->job_spool_size) {
190 spool_stats.data_size = 0;
192 spool_stats.data_size -= dcr->job_spool_size;
194 dcr->job_spool_size = 0;
197 make_unique_data_spool_filename(dcr, &name);
198 close(dcr->spool_fd);
200 dcr->spooling = false;
202 Dmsg1(100, "Deleted spool file: %s\n", name);
203 free_pool_memory(name);
207 static const char *spool_name = "*spool*";
210 * NB! This routine locks the device, but if committing will
211 * not unlock it. If not committing, it will be unlocked.
213 static bool despool_data(DCR *dcr, bool commit)
223 Dmsg0(100, "Despooling data\n");
225 * Commit means that the job is done, so we commit, otherwise, we
226 * are despooling because of user spool size max or some error
227 * (e.g. filesystem full).
230 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
231 jcr->dcr->VolumeName,
232 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
233 set_jcr_job_status(jcr, JS_DataCommitting);
235 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
236 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237 set_jcr_job_status(jcr, JS_DataDespooling);
239 set_jcr_job_status(jcr, JS_DataDespooling);
240 dir_send_job_status(jcr);
241 dcr->despool_wait = true;
242 dcr->spooling = false;
244 * We work with device blocked, but not locked so that
245 * other threads -- e.g. reservations can lock the device
248 dcr->dblock(BST_DESPOOLING);
249 dcr->despool_wait = false;
250 dcr->despooling = true;
252 set_start_vol_position(dcr);
256 * This is really quite kludgy and should be fixed some time.
257 * We create a dev structure to read from the spool file
260 rdev = (DEVICE *)malloc(sizeof(DEVICE));
261 memset(rdev, 0, sizeof(DEVICE));
262 rdev->dev_name = get_memory(strlen(spool_name)+1);
263 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
264 rdev->errmsg = get_pool_memory(PM_EMSG);
266 rdev->max_block_size = dcr->dev->max_block_size;
267 rdev->min_block_size = dcr->dev->min_block_size;
268 rdev->device = dcr->dev->device;
269 rdcr = new_dcr(jcr, NULL, rdev);
270 rdcr->spool_fd = dcr->spool_fd;
271 block = dcr->block; /* save block */
272 dcr->block = rdcr->block; /* make read and write block the same */
274 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
275 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
277 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
278 posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
281 /* Add run time, to get current wait time */
282 time_t despool_start = time(NULL) - jcr->run_time;
285 if (job_canceled(jcr)) {
289 stat = read_block_from_spool_file(rdcr);
290 if (stat == RB_EOT) {
292 } else if (stat == RB_ERROR) {
296 ok = write_block_to_device(dcr);
298 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
299 dcr->dev->print_name(), dcr->dev->bstrerror());
301 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
304 /* Subtracting run_time give us elapsed time - wait_time since we started despooling */
305 time_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
307 if (despool_elapsed <= 0) {
311 Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
312 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
313 edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
315 dcr->block = block; /* reset block */
317 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
318 if (ftruncate(rdcr->spool_fd, 0) != 0) {
320 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
322 /* Note, try continuing despite ftruncate problem */
326 if (spool_stats.data_size < dcr->job_spool_size) {
327 spool_stats.data_size = 0;
329 spool_stats.data_size -= dcr->job_spool_size;
332 P(dcr->dev->spool_mutex);
333 dcr->dev->spool_size -= dcr->job_spool_size;
334 dcr->job_spool_size = 0; /* zap size in input dcr */
335 V(dcr->dev->spool_mutex);
336 free_memory(rdev->dev_name);
337 free_pool_memory(rdev->errmsg);
338 /* Be careful to NULL the jcr and free rdev after free_dcr() */
343 dcr->spooling = true; /* turn on spooling again */
344 dcr->despooling = false;
347 * We are done, so unblock the device, but if we have done a
348 * commit, leave it locked so that the job cleanup does not
349 * need to wait to release the device (no re-acquire of the lock).
352 unblock_device(dcr->dev);
353 /* If doing a commit, leave the device locked -- unlocked in release_device() */
357 set_jcr_job_status(jcr, JS_Running);
358 dir_send_job_status(jcr);
363 * Read a block from the spool file
365 * Returns RB_OK on success
366 * RB_EOT when file done
369 static int read_block_from_spool_file(DCR *dcr)
374 DEV_BLOCK *block = dcr->block;
377 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
379 Dmsg0(100, "EOT on spool read.\n");
381 } else if (stat != (ssize_t)rlen) {
384 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
387 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
388 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
393 if (rlen > block->buf_len) {
394 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
395 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
398 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
399 if (stat != (ssize_t)rlen) {
400 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
401 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
404 /* Setup write pointers */
405 block->binbuf = rlen;
406 block->bufp = block->buf + block->binbuf;
407 block->FirstIndex = hdr.FirstIndex;
408 block->LastIndex = hdr.LastIndex;
409 block->VolSessionId = dcr->jcr->VolSessionId;
410 block->VolSessionTime = dcr->jcr->VolSessionTime;
411 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
416 * Write a block to the spool file
418 * Returns: true on success or EOT
419 * false on hard error
421 bool write_block_to_spool_file(DCR *dcr)
423 uint32_t wlen, hlen; /* length to write */
424 bool despool = false;
425 DEV_BLOCK *block = dcr->block;
427 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
428 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
432 hlen = sizeof(spool_hdr);
433 wlen = block->binbuf;
434 P(dcr->dev->spool_mutex);
435 dcr->job_spool_size += hlen + wlen;
436 dcr->dev->spool_size += hlen + wlen;
437 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
438 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
441 V(dcr->dev->spool_mutex);
443 spool_stats.data_size += hlen + wlen;
444 if (spool_stats.data_size > spool_stats.max_data_size) {
445 spool_stats.max_data_size = spool_stats.data_size;
450 char ec1[30], ec2[30], ec3[30], ec4[30];
451 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
452 "max_job_size=%s job_size=%s\n",
453 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
454 edit_uint64_with_commas(dcr->job_spool_size, ec2),
455 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
456 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
458 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
459 if (!despool_data(dcr, false)) {
460 Pmsg0(000, _("Bad return from despool in write_block.\n"));
463 /* Despooling cleared these variables so reset them */
464 P(dcr->dev->spool_mutex);
465 dcr->job_spool_size += hlen + wlen;
466 dcr->dev->spool_size += hlen + wlen;
467 V(dcr->dev->spool_mutex);
468 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
472 if (!write_spool_header(dcr)) {
475 if (!write_spool_data(dcr)) {
479 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
484 static bool write_spool_header(DCR *dcr)
488 DEV_BLOCK *block = dcr->block;
490 hdr.FirstIndex = block->FirstIndex;
491 hdr.LastIndex = block->LastIndex;
492 hdr.len = block->binbuf;
495 for (int retry=0; retry<=1; retry++) {
496 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
499 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
502 if (stat != (ssize_t)sizeof(hdr)) {
503 /* If we wrote something, truncate it, then despool */
505 #if defined(HAVE_WIN32)
506 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
508 boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
510 if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
512 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
514 /* Note, try continuing despite ftruncate problem */
517 if (!despool_data(dcr, false)) {
518 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
521 continue; /* try again */
525 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
529 static bool write_spool_data(DCR *dcr)
532 DEV_BLOCK *block = dcr->block;
535 for (int retry=0; retry<=1; retry++) {
536 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
539 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
542 if (stat != (ssize_t)block->binbuf) {
544 * If we wrote something, truncate it and the header, then despool
547 #if defined(HAVE_WIN32)
548 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
550 boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
552 if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
554 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
556 /* Note, try continuing despite ftruncate problem */
559 if (!despool_data(dcr, false)) {
560 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
563 if (!write_spool_header(dcr)) {
566 continue; /* try again */
570 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
576 bool are_attributes_spooled(JCR *jcr)
578 return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
582 * Create spool file for attributes.
583 * This is done by "attaching" to the bsock, and when
584 * it is called, the output is written to a file.
585 * The actual spooling is turned on and off in
586 * append.c only during writing of the attributes.
588 bool begin_attribute_spool(JCR *jcr)
590 if (!jcr->no_attributes && jcr->spool_attributes) {
591 return open_attr_spool_file(jcr, jcr->dir_bsock);
596 bool discard_attribute_spool(JCR *jcr)
598 if (are_attributes_spooled(jcr)) {
599 return close_attr_spool_file(jcr, jcr->dir_bsock);
604 static void update_attr_spool_size(ssize_t size)
608 if ((spool_stats.attr_size - size) > 0) {
609 spool_stats.attr_size -= size;
611 spool_stats.attr_size = 0;
617 bool commit_attribute_spool(JCR *jcr)
622 if (are_attributes_spooled(jcr)) {
623 if (fseeko(jcr->dir_bsock->m_spool_fd, 0, SEEK_END) != 0) {
625 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
629 size = ftello(jcr->dir_bsock->m_spool_fd);
632 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
637 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
638 spool_stats.max_attr_size = spool_stats.attr_size + size;
640 spool_stats.attr_size += size;
642 set_jcr_job_status(jcr, JS_AttrDespooling);
643 dir_send_job_status(jcr);
644 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
645 edit_uint64_with_commas(size, ec1));
646 jcr->dir_bsock->despool(update_attr_spool_size, size);
647 return close_attr_spool_file(jcr, jcr->dir_bsock);
652 close_attr_spool_file(jcr, jcr->dir_bsock);
656 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
658 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
663 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
665 POOLMEM *name = get_pool_memory(PM_MESSAGE);
667 make_unique_spool_filename(jcr, &name, bs->m_fd);
668 bs->m_spool_fd = fopen(name, "w+b");
669 if (!bs->m_spool_fd) {
671 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
673 free_pool_memory(name);
677 spool_stats.attr_jobs++;
679 free_pool_memory(name);
683 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
687 if (!bs->m_spool_fd) {
690 name = get_pool_memory(PM_MESSAGE);
692 spool_stats.attr_jobs--;
693 spool_stats.total_attr_jobs++;
695 make_unique_spool_filename(jcr, &name, bs->m_fd);
696 fclose(bs->m_spool_fd);
698 free_pool_memory(name);
699 bs->m_spool_fd = NULL;
700 bs->clear_spooling();