2 Bacula® - The Network Backup Solution
4 Copyright (C) 2004-2007 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of John Walker.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
31 * Kern Sibbald, March 2004
39 /* Forward referenced subroutines */
40 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
41 static bool open_data_spool_file(DCR *dcr);
42 static bool close_data_spool_file(DCR *dcr);
43 static bool despool_data(DCR *dcr, bool commit);
44 static int read_block_from_spool_file(DCR *dcr);
45 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
47 static bool write_spool_header(DCR *dcr);
48 static bool write_spool_data(DCR *dcr);
50 struct spool_stats_t {
51 uint32_t data_jobs; /* current jobs spooling data */
53 uint32_t total_data_jobs; /* total jobs to have spooled data */
54 uint32_t total_attr_jobs;
55 int64_t max_data_size; /* max data size */
56 int64_t max_attr_size;
57 int64_t data_size; /* current data size (all jobs running) */
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 spool_stats_t spool_stats;
65 * Header for data spool record */
67 int32_t FirstIndex; /* FirstIndex for buffer */
68 int32_t LastIndex; /* LastIndex for buffer */
69 uint32_t len; /* length of next buffer */
78 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
80 char ed1[30], ed2[30];
81 POOL_MEM msg(PM_MESSAGE);
84 len = Mmsg(msg, _("Spooling statistics:\n"));
86 if (spool_stats.data_jobs || spool_stats.max_data_size) {
87 len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
88 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
89 spool_stats.total_data_jobs,
90 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
92 sendit(msg.c_str(), len, arg);
94 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
95 len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
96 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
97 spool_stats.total_attr_jobs,
98 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
100 sendit(msg.c_str(), len, arg);
102 len = Mmsg(msg, "====\n");
103 sendit(msg.c_str(), len, arg);
106 bool begin_data_spool(DCR *dcr)
109 if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
110 Dmsg0(100, "Turning on data spooling\n");
111 dcr->spool_data = true;
112 stat = open_data_spool_file(dcr);
114 dcr->spooling = true;
115 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
117 spool_stats.data_jobs++;
124 bool discard_data_spool(DCR *dcr)
127 Dmsg0(100, "Data spooling discarded\n");
128 return close_data_spool_file(dcr);
133 bool commit_data_spool(DCR *dcr)
138 Dmsg0(100, "Committing spooled data\n");
139 stat = despool_data(dcr, true /*commit*/);
141 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
142 close_data_spool_file(dcr);
145 return close_data_spool_file(dcr);
150 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
153 if (dcr->dev->device->spool_directory) {
154 dir = dcr->dev->device->spool_directory;
156 dir = working_directory;
158 Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
159 dcr->jcr->Job, dcr->device->hdr.name);
163 static bool open_data_spool_file(DCR *dcr)
165 POOLMEM *name = get_pool_memory(PM_MESSAGE);
168 make_unique_data_spool_filename(dcr, &name);
169 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
170 dcr->spool_fd = spool_fd;
171 dcr->jcr->spool_attributes = true;
174 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
176 free_pool_memory(name);
179 Dmsg1(100, "Created spool file: %s\n", name);
180 free_pool_memory(name);
184 static bool close_data_spool_file(DCR *dcr)
186 POOLMEM *name = get_pool_memory(PM_MESSAGE);
189 spool_stats.data_jobs--;
190 spool_stats.total_data_jobs++;
191 if (spool_stats.data_size < dcr->job_spool_size) {
192 spool_stats.data_size = 0;
194 spool_stats.data_size -= dcr->job_spool_size;
196 dcr->job_spool_size = 0;
199 make_unique_data_spool_filename(dcr, &name);
200 close(dcr->spool_fd);
202 dcr->spooling = false;
204 Dmsg1(100, "Deleted spool file: %s\n", name);
205 free_pool_memory(name);
209 static const char *spool_name = "*spool*";
212 * NB! This routine locks the device, but if committing will
213 * not unlock it. If not committing, it will be unlocked.
215 static bool despool_data(DCR *dcr, bool commit)
225 Dmsg0(100, "Despooling data\n");
227 * Commit means that the job is done, so we commit, otherwise, we
228 * are despooling because of user spool size max or some error
229 * (e.g. filesystem full).
232 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
233 jcr->dcr->VolumeName,
234 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
235 set_jcr_job_status(jcr, JS_DataCommitting);
237 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
238 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
239 set_jcr_job_status(jcr, JS_DataDespooling);
241 set_jcr_job_status(jcr, JS_DataDespooling);
242 dir_send_job_status(jcr);
243 dcr->despool_wait = true;
244 dcr->spooling = false;
246 * We work with device blocked, but not locked so that
247 * other threads -- e.g. reservations can lock the device
250 dcr->dblock(BST_DESPOOLING);
251 dcr->despool_wait = false;
252 dcr->despooling = true;
255 * This is really quite kludgy and should be fixed some time.
256 * We create a dev structure to read from the spool file
259 rdev = (DEVICE *)malloc(sizeof(DEVICE));
260 memset(rdev, 0, sizeof(DEVICE));
261 rdev->dev_name = get_memory(strlen(spool_name)+1);
262 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
263 rdev->errmsg = get_pool_memory(PM_EMSG);
265 rdev->max_block_size = dcr->dev->max_block_size;
266 rdev->min_block_size = dcr->dev->min_block_size;
267 rdev->device = dcr->dev->device;
268 rdcr = new_dcr(jcr, NULL, rdev);
269 rdcr->spool_fd = dcr->spool_fd;
270 block = dcr->block; /* save block */
271 dcr->block = rdcr->block; /* make read and write block the same */
273 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
274 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
276 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
277 posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
280 /* Add run time, to get current wait time */
281 time_t despool_start = time(NULL) - jcr->run_time;
284 if (job_canceled(jcr)) {
288 stat = read_block_from_spool_file(rdcr);
289 if (stat == RB_EOT) {
291 } else if (stat == RB_ERROR) {
295 ok = write_block_to_device(dcr);
297 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
298 dcr->dev->print_name(), dcr->dev->bstrerror());
300 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
303 /* Subtracting run_time give us elapsed time - wait_time since we started despooling */
304 time_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
306 if (despool_elapsed <= 0) {
310 Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
311 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
312 edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
314 dcr->block = block; /* reset block */
316 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
317 if (ftruncate(rdcr->spool_fd, 0) != 0) {
319 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
321 /* Note, try continuing despite ftruncate problem */
325 if (spool_stats.data_size < dcr->job_spool_size) {
326 spool_stats.data_size = 0;
328 spool_stats.data_size -= dcr->job_spool_size;
331 P(dcr->dev->spool_mutex);
332 dcr->dev->spool_size -= dcr->job_spool_size;
333 dcr->job_spool_size = 0; /* zap size in input dcr */
334 V(dcr->dev->spool_mutex);
335 free_memory(rdev->dev_name);
336 free_pool_memory(rdev->errmsg);
337 /* Be careful to NULL the jcr and free rdev after free_dcr() */
342 dcr->spooling = true; /* turn on spooling again */
343 dcr->despooling = false;
346 * We are done, so unblock the device, but if we have done a
347 * commit, leave it locked so that the job cleanup does not
348 * need to wait to release the device (no re-acquire of the lock).
351 unblock_device(dcr->dev);
352 /* If doing a commit, leave the device locked -- unlocked in release_device() */
356 set_jcr_job_status(jcr, JS_Running);
357 dir_send_job_status(jcr);
362 * Read a block from the spool file
364 * Returns RB_OK on success
365 * RB_EOT when file done
368 static int read_block_from_spool_file(DCR *dcr)
373 DEV_BLOCK *block = dcr->block;
376 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
378 Dmsg0(100, "EOT on spool read.\n");
380 } else if (stat != (ssize_t)rlen) {
383 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
386 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
387 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
392 if (rlen > block->buf_len) {
393 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
394 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
397 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
398 if (stat != (ssize_t)rlen) {
399 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
400 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
403 /* Setup write pointers */
404 block->binbuf = rlen;
405 block->bufp = block->buf + block->binbuf;
406 block->FirstIndex = hdr.FirstIndex;
407 block->LastIndex = hdr.LastIndex;
408 block->VolSessionId = dcr->jcr->VolSessionId;
409 block->VolSessionTime = dcr->jcr->VolSessionTime;
410 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
415 * Write a block to the spool file
417 * Returns: true on success or EOT
418 * false on hard error
420 bool write_block_to_spool_file(DCR *dcr)
422 uint32_t wlen, hlen; /* length to write */
423 bool despool = false;
424 DEV_BLOCK *block = dcr->block;
426 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
427 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
431 hlen = sizeof(spool_hdr);
432 wlen = block->binbuf;
433 P(dcr->dev->spool_mutex);
434 dcr->job_spool_size += hlen + wlen;
435 dcr->dev->spool_size += hlen + wlen;
436 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
437 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
440 V(dcr->dev->spool_mutex);
442 spool_stats.data_size += hlen + wlen;
443 if (spool_stats.data_size > spool_stats.max_data_size) {
444 spool_stats.max_data_size = spool_stats.data_size;
449 char ec1[30], ec2[30], ec3[30], ec4[30];
450 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
451 "max_job_size=%s job_size=%s\n",
452 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
453 edit_uint64_with_commas(dcr->job_spool_size, ec2),
454 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
455 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
457 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
458 if (!despool_data(dcr, false)) {
459 Pmsg0(000, _("Bad return from despool in write_block.\n"));
462 /* Despooling cleared these variables so reset them */
463 P(dcr->dev->spool_mutex);
464 dcr->job_spool_size += hlen + wlen;
465 dcr->dev->spool_size += hlen + wlen;
466 V(dcr->dev->spool_mutex);
467 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
471 if (!write_spool_header(dcr)) {
474 if (!write_spool_data(dcr)) {
478 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
483 static bool write_spool_header(DCR *dcr)
487 DEV_BLOCK *block = dcr->block;
489 hdr.FirstIndex = block->FirstIndex;
490 hdr.LastIndex = block->LastIndex;
491 hdr.len = block->binbuf;
494 for (int retry=0; retry<=1; retry++) {
495 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
498 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
501 if (stat != (ssize_t)sizeof(hdr)) {
502 /* If we wrote something, truncate it, then despool */
504 #if defined(HAVE_WIN32)
505 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
507 boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
509 if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
511 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
513 /* Note, try continuing despite ftruncate problem */
516 if (!despool_data(dcr, false)) {
517 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
520 continue; /* try again */
524 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
528 static bool write_spool_data(DCR *dcr)
531 DEV_BLOCK *block = dcr->block;
534 for (int retry=0; retry<=1; retry++) {
535 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
538 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
541 if (stat != (ssize_t)block->binbuf) {
543 * If we wrote something, truncate it and the header, then despool
546 #if defined(HAVE_WIN32)
547 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
549 boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
551 if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
553 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
555 /* Note, try continuing despite ftruncate problem */
558 if (!despool_data(dcr, false)) {
559 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
562 if (!write_spool_header(dcr)) {
565 continue; /* try again */
569 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
575 bool are_attributes_spooled(JCR *jcr)
577 return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
581 * Create spool file for attributes.
582 * This is done by "attaching" to the bsock, and when
583 * it is called, the output is written to a file.
584 * The actual spooling is turned on and off in
585 * append.c only during writing of the attributes.
587 bool begin_attribute_spool(JCR *jcr)
589 if (!jcr->no_attributes && jcr->spool_attributes) {
590 return open_attr_spool_file(jcr, jcr->dir_bsock);
595 bool discard_attribute_spool(JCR *jcr)
597 if (are_attributes_spooled(jcr)) {
598 return close_attr_spool_file(jcr, jcr->dir_bsock);
603 static void update_attr_spool_size(ssize_t size)
607 if ((spool_stats.attr_size - size) > 0) {
608 spool_stats.attr_size -= size;
610 spool_stats.attr_size = 0;
616 bool commit_attribute_spool(JCR *jcr)
621 if (are_attributes_spooled(jcr)) {
622 if (fseeko(jcr->dir_bsock->m_spool_fd, 0, SEEK_END) != 0) {
624 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
628 size = ftello(jcr->dir_bsock->m_spool_fd);
631 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
636 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
637 spool_stats.max_attr_size = spool_stats.attr_size + size;
639 spool_stats.attr_size += size;
641 set_jcr_job_status(jcr, JS_AttrDespooling);
642 dir_send_job_status(jcr);
643 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
644 edit_uint64_with_commas(size, ec1));
645 jcr->dir_bsock->despool(update_attr_spool_size, size);
646 return close_attr_spool_file(jcr, jcr->dir_bsock);
651 close_attr_spool_file(jcr, jcr->dir_bsock);
655 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
657 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
662 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
664 POOLMEM *name = get_pool_memory(PM_MESSAGE);
666 make_unique_spool_filename(jcr, &name, bs->m_fd);
667 bs->m_spool_fd = fopen(name, "w+b");
668 if (!bs->m_spool_fd) {
670 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
672 free_pool_memory(name);
676 spool_stats.attr_jobs++;
678 free_pool_memory(name);
682 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
686 if (!bs->m_spool_fd) {
689 name = get_pool_memory(PM_MESSAGE);
691 spool_stats.attr_jobs--;
692 spool_stats.total_attr_jobs++;
694 make_unique_spool_filename(jcr, &name, bs->m_fd);
695 fclose(bs->m_spool_fd);
697 free_pool_memory(name);
698 bs->m_spool_fd = NULL;
699 bs->clear_spooling();