2 Bacula® - The Network Backup Solution
4 Copyright (C) 2004-2014 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from many
7 others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 Bacula® is a registered trademark of Kern Sibbald.
19 * Kern Sibbald, March 2004
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
37 struct spool_stats_t {
38 uint32_t data_jobs; /* current jobs spooling data */
40 uint32_t total_data_jobs; /* total jobs to have spooled data */
41 uint32_t total_attr_jobs;
42 int64_t max_data_size; /* max data size */
43 int64_t max_attr_size;
44 int64_t data_size; /* current data size (all jobs running) */
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
52 * Header for data spool record */
54 int32_t FirstIndex; /* FirstIndex for buffer */
55 int32_t LastIndex; /* LastIndex for buffer */
56 uint32_t len; /* length of next buffer */
65 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
67 char ed1[30], ed2[30];
68 POOL_MEM msg(PM_MESSAGE);
71 len = Mmsg(msg, _("Spooling statistics:\n"));
73 if (spool_stats.data_jobs || spool_stats.max_data_size) {
74 len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
75 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
76 spool_stats.total_data_jobs,
77 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
79 sendit(msg.c_str(), len, arg);
81 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
82 len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
83 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
84 spool_stats.total_attr_jobs,
85 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
87 sendit(msg.c_str(), len, arg);
91 bool begin_data_spool(DCR *dcr)
94 if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
95 Dmsg0(100, "Turning on data spooling\n");
96 dcr->spool_data = true;
97 stat = open_data_spool_file(dcr);
100 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
102 spool_stats.data_jobs++;
109 bool discard_data_spool(DCR *dcr)
112 Dmsg0(100, "Data spooling discarded\n");
113 return close_data_spool_file(dcr);
118 bool commit_data_spool(DCR *dcr)
123 Dmsg0(100, "Committing spooled data\n");
124 stat = despool_data(dcr, true /*commit*/);
126 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
127 close_data_spool_file(dcr);
130 return close_data_spool_file(dcr);
135 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
138 if (dcr->dev->device->spool_directory) {
139 dir = dcr->dev->device->spool_directory;
141 dir = working_directory;
143 Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
144 dcr->jcr->Job, dcr->device->hdr.name);
148 static bool open_data_spool_file(DCR *dcr)
150 POOLMEM *name = get_pool_memory(PM_MESSAGE);
153 make_unique_data_spool_filename(dcr, &name);
154 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
155 dcr->spool_fd = spool_fd;
156 dcr->jcr->spool_attributes = true;
159 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
161 free_pool_memory(name);
164 Dmsg1(100, "Created spool file: %s\n", name);
165 free_pool_memory(name);
169 static const char *spool_name = "*spool*";
172 * NB! This routine locks the device, but if committing will
173 * not unlock it. If not committing, it will be unlocked.
175 static bool despool_data(DCR *dcr, bool commit)
185 Dmsg0(100, "Despooling data\n");
186 if (jcr->dcr->job_spool_size == 0) {
187 Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
191 * Commit means that the job is done, so we commit, otherwise, we
192 * are despooling because of user spool size max or some error
193 * (e.g. filesystem full).
196 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
197 jcr->dcr->VolumeName,
198 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
199 jcr->setJobStatus(JS_DataCommitting);
201 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
202 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
203 jcr->setJobStatus(JS_DataDespooling);
205 jcr->sendJobStatus(JS_DataDespooling);
206 dcr->despool_wait = true;
207 dcr->spooling = false;
209 * We work with device blocked, but not locked so that
210 * other threads -- e.g. reservations can lock the device
213 dcr->dblock(BST_DESPOOLING);
214 dcr->despool_wait = false;
215 dcr->despooling = true;
218 * This is really quite kludgy and should be fixed some time.
219 * We create a dev structure to read from the spool file
222 rdev = (DEVICE *)malloc(sizeof(DEVICE));
223 memset(rdev, 0, sizeof(DEVICE));
224 rdev->dev_name = get_memory(strlen(spool_name)+1);
225 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
226 rdev->errmsg = get_pool_memory(PM_EMSG);
228 rdev->max_block_size = dcr->dev->max_block_size;
229 rdev->min_block_size = dcr->dev->min_block_size;
230 rdev->device = dcr->dev->device;
231 rdcr = new_dcr(jcr, NULL, rdev, SD_READ);
232 rdcr->spool_fd = dcr->spool_fd;
233 block = dcr->block; /* save block */
234 dcr->block = rdcr->block; /* make read and write block the same */
236 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
237 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
239 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
240 posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
243 /* Add run time, to get current wait time */
244 int32_t despool_start = time(NULL) - jcr->run_time;
246 set_new_file_parameters(dcr);
249 if (job_canceled(jcr)) {
253 stat = read_block_from_spool_file(rdcr);
254 if (stat == RB_EOT) {
256 } else if (stat == RB_ERROR) {
260 ok = dcr->write_block_to_device();
262 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
263 dcr->dev->print_name(), dcr->dev->bstrerror());
264 Pmsg2(000, "Fatal append error on device %s: ERR=%s\n",
265 dcr->dev->print_name(), dcr->dev->bstrerror());
266 jcr->forceJobStatus(JS_FatalError);
268 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
271 if (!dir_create_jobmedia_record(dcr)) {
272 Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
273 dcr->getVolCatName(), jcr->Job);
274 jcr->forceJobStatus(JS_FatalError);
276 /* Set new file/block parameters for current dcr */
277 set_new_file_parameters(dcr);
280 * Subtracting run_time give us elapsed time - wait_time since
281 * we started despooling. Note, don't use time_t as it is 32 or 64
282 * bits depending on the OS and doesn't edit with %d
284 int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
286 if (despool_elapsed <= 0) {
290 Jmsg(jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
291 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
292 edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
294 dcr->block = block; /* reset block */
296 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
297 if (ftruncate(rdcr->spool_fd, 0) != 0) {
299 Jmsg(jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
301 /* Note, try continuing despite ftruncate problem */
305 if (spool_stats.data_size < dcr->job_spool_size) {
306 spool_stats.data_size = 0;
308 spool_stats.data_size -= dcr->job_spool_size;
311 P(dcr->dev->spool_mutex);
312 dcr->dev->spool_size -= dcr->job_spool_size;
313 dcr->job_spool_size = 0; /* zap size in input dcr */
314 V(dcr->dev->spool_mutex);
315 free_memory(rdev->dev_name);
316 free_pool_memory(rdev->errmsg);
317 /* Be careful to NULL the jcr and free rdev after free_dcr() */
322 dcr->spooling = true; /* turn on spooling again */
323 dcr->despooling = false;
325 * Note, if committing we leave the device blocked. It will be removed in
329 dcr->dev->dunblock();
331 jcr->sendJobStatus(JS_Running);
336 * Read a block from the spool file
338 * Returns RB_OK on success
339 * RB_EOT when file done
342 static int read_block_from_spool_file(DCR *dcr)
347 DEV_BLOCK *block = dcr->block;
351 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
353 Dmsg0(100, "EOT on spool read.\n");
355 } else if (stat != (ssize_t)rlen) {
358 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
361 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
362 Jmsg2(jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
364 jcr->forceJobStatus(JS_FatalError);
368 if (rlen > block->buf_len) {
369 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
370 Jmsg2(jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
371 jcr->forceJobStatus(JS_FatalError);
374 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
375 if (stat != (ssize_t)rlen) {
376 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
377 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
378 jcr->forceJobStatus(JS_FatalError);
381 /* Setup write pointers */
382 block->binbuf = rlen;
383 block->bufp = block->buf + block->binbuf;
384 block->FirstIndex = hdr.FirstIndex;
385 block->LastIndex = hdr.LastIndex;
386 block->VolSessionId = dcr->jcr->VolSessionId;
387 block->VolSessionTime = dcr->jcr->VolSessionTime;
388 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
393 * Write a block to the spool file
395 * Returns: true on success or EOT
396 * false on hard error
398 bool write_block_to_spool_file(DCR *dcr)
400 uint32_t wlen, hlen; /* length to write */
401 bool despool = false;
402 DEV_BLOCK *block = dcr->block;
404 if (job_canceled(dcr->jcr)) {
407 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
408 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
412 hlen = sizeof(spool_hdr);
413 wlen = block->binbuf;
414 P(dcr->dev->spool_mutex);
415 dcr->job_spool_size += hlen + wlen;
416 dcr->dev->spool_size += hlen + wlen;
417 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
418 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
421 V(dcr->dev->spool_mutex);
423 spool_stats.data_size += hlen + wlen;
424 if (spool_stats.data_size > spool_stats.max_data_size) {
425 spool_stats.max_data_size = spool_stats.data_size;
429 char ec1[30], ec2[30];
430 if (dcr->max_job_spool_size > 0) {
431 Jmsg(dcr->jcr, M_INFO, 0, _("User specified Job spool size reached: "
432 "JobSpoolSize=%s MaxJobSpoolSize=%s\n"),
433 edit_uint64_with_commas(dcr->job_spool_size, ec1),
434 edit_uint64_with_commas(dcr->max_job_spool_size, ec2));
436 Jmsg(dcr->jcr, M_INFO, 0, _("User specified Device spool size reached: "
437 "DevSpoolSize=%s MaxDevSpoolSize=%s\n"),
438 edit_uint64_with_commas(dcr->dev->spool_size, ec1),
439 edit_uint64_with_commas(dcr->dev->max_spool_size, ec2));
442 if (!despool_data(dcr, false)) {
443 Pmsg0(000, _("Bad return from despool in write_block.\n"));
446 /* Despooling cleared these variables so reset them */
447 P(dcr->dev->spool_mutex);
448 dcr->job_spool_size += hlen + wlen;
449 dcr->dev->spool_size += hlen + wlen;
450 V(dcr->dev->spool_mutex);
451 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
455 if (!write_spool_header(dcr)) {
458 if (!write_spool_data(dcr)) {
462 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
467 static bool write_spool_header(DCR *dcr)
471 DEV_BLOCK *block = dcr->block;
474 hdr.FirstIndex = block->FirstIndex;
475 hdr.LastIndex = block->LastIndex;
476 hdr.len = block->binbuf;
479 for (int retry=0; retry<=1; retry++) {
480 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
483 Jmsg(jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
485 jcr->forceJobStatus(JS_FatalError);
487 if (stat != (ssize_t)sizeof(hdr)) {
488 Jmsg(jcr, M_ERROR, 0, _("Error writing header to spool file."
489 " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
490 (int)stat, (int)sizeof(hdr));
491 /* If we wrote something, truncate it, then despool */
493 #if defined(HAVE_WIN32)
494 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
496 boffset_t pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
498 if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
500 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
502 /* Note, try continuing despite ftruncate problem */
505 if (!despool_data(dcr, false)) {
506 Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
507 jcr->forceJobStatus(JS_FatalError);
510 continue; /* try again */
514 Jmsg(jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
515 jcr->forceJobStatus(JS_FatalError);
519 static bool write_spool_data(DCR *dcr)
522 DEV_BLOCK *block = dcr->block;
526 for (int retry=0; retry<=1; retry++) {
527 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
530 Jmsg(jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
532 jcr->forceJobStatus(JS_FatalError);
534 if (stat != (ssize_t)block->binbuf) {
536 * If we wrote something, truncate it and the header, then despool
539 #if defined(HAVE_WIN32)
540 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
542 boffset_t pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
544 if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
546 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
548 /* Note, try continuing despite ftruncate problem */
551 if (!despool_data(dcr, false)) {
552 Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
553 jcr->forceJobStatus(JS_FatalError);
556 if (!write_spool_header(dcr)) {
559 continue; /* try again */
563 Jmsg(jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
564 jcr->forceJobStatus(JS_FatalError);
568 static bool close_data_spool_file(DCR *dcr)
570 POOLMEM *name = get_pool_memory(PM_MESSAGE);
573 spool_stats.data_jobs--;
574 spool_stats.total_data_jobs++;
575 if (spool_stats.data_size < dcr->job_spool_size) {
576 spool_stats.data_size = 0;
578 spool_stats.data_size -= dcr->job_spool_size;
581 P(dcr->dev->spool_mutex);
582 dcr->job_spool_size = 0;
583 V(dcr->dev->spool_mutex);
585 make_unique_data_spool_filename(dcr, &name);
586 close(dcr->spool_fd);
588 dcr->spooling = false;
590 Dmsg1(100, "Deleted spool file: %s\n", name);
591 free_pool_memory(name);
595 bool are_attributes_spooled(JCR *jcr)
597 return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
601 * Create spool file for attributes.
602 * This is done by "attaching" to the bsock, and when
603 * it is called, the output is written to a file.
604 * The actual spooling is turned on and off in
605 * append.c only during writing of the attributes.
607 bool begin_attribute_spool(JCR *jcr)
609 if (!jcr->no_attributes && jcr->spool_attributes) {
610 return open_attr_spool_file(jcr, jcr->dir_bsock);
615 static void update_attr_spool_size(ssize_t size)
619 if ((spool_stats.attr_size - size) > 0) {
620 spool_stats.attr_size -= size;
622 spool_stats.attr_size = 0;
628 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
630 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
635 * Tell Director where to find the attributes spool file
636 * Note, if we are not on the same machine, the Director will
637 * return an error, and the higher level routine will transmit
638 * the data record by record -- using bsock->despool().
640 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
642 /* send full spool file name */
643 POOLMEM *name = get_pool_memory(PM_MESSAGE);
644 make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
646 jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
647 free_pool_memory(name);
649 if (jcr->dir_bsock->recv() <= 0) {
650 Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
651 jcr->forceJobStatus(JS_FatalError);
655 if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
661 bool commit_attribute_spool(JCR *jcr)
668 Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
669 (utime_t)time(NULL)));
670 if (are_attributes_spooled(jcr)) {
671 dir = jcr->dir_bsock;
672 if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
674 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
676 jcr->forceJobStatus(JS_FatalError);
679 size = ftello(dir->m_spool_fd);
682 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
684 jcr->forceJobStatus(JS_FatalError);
688 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
689 spool_stats.max_attr_size = spool_stats.attr_size + size;
691 spool_stats.attr_size += size;
693 jcr->sendJobStatus(JS_AttrDespooling);
694 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
695 edit_uint64_with_commas(size, ec1));
697 if (!blast_attr_spool_file(jcr, size)) {
698 /* Can't read spool file from director side,
699 * send content over network.
701 dir->despool(update_attr_spool_size, size);
703 return close_attr_spool_file(jcr, dir);
708 close_attr_spool_file(jcr, dir);
712 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
714 POOLMEM *name = get_pool_memory(PM_MESSAGE);
716 make_unique_spool_filename(jcr, &name, bs->m_fd);
717 bs->m_spool_fd = fopen(name, "w+b");
718 if (!bs->m_spool_fd) {
720 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
722 jcr->forceJobStatus(JS_FatalError);
723 free_pool_memory(name);
727 spool_stats.attr_jobs++;
729 free_pool_memory(name);
733 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
739 Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
740 (utime_t)time(NULL)));
741 if (!bs->m_spool_fd) {
744 name = get_pool_memory(PM_MESSAGE);
746 spool_stats.attr_jobs--;
747 spool_stats.total_attr_jobs++;
749 make_unique_spool_filename(jcr, &name, bs->m_fd);
750 fclose(bs->m_spool_fd);
752 free_pool_memory(name);
753 bs->m_spool_fd = NULL;
754 bs->clear_spooling();
758 bool discard_attribute_spool(JCR *jcr)
760 if (are_attributes_spooled(jcr)) {
761 return close_attr_spool_file(jcr, jcr->dir_bsock);