4 * Kern Sibbald, March 2004
9 Bacula® - The Network Backup Solution
11 Copyright (C) 2004-2006 Free Software Foundation Europe e.V.
13 The main author of Bacula is Kern Sibbald, with contributions from
14 many others, a complete list can be found in the file AUTHORS.
15 This program is Free Software; you can redistribute it and/or
16 modify it under the terms of version two of the GNU General Public
17 License as published by the Free Software Foundation plus additions
18 that are listed in the file LICENSE.
20 This program is distributed in the hope that it will be useful, but
21 WITHOUT ANY WARRANTY; without even the implied warranty of
22 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23 General Public License for more details.
25 You should have received a copy of the GNU General Public License
26 along with this program; if not, write to the Free Software
27 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 Bacula® is a registered trademark of John Walker.
31 The licensor of Bacula is the Free Software Foundation Europe
32 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
33 Switzerland, email:ftf@fsfeurope.org.
39 /* Forward referenced subroutines */
40 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
41 static bool open_data_spool_file(DCR *dcr);
42 static bool close_data_spool_file(DCR *dcr);
43 static bool despool_data(DCR *dcr, bool commit);
44 static int read_block_from_spool_file(DCR *dcr);
45 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
47 static bool write_spool_header(DCR *dcr);
48 static bool write_spool_data(DCR *dcr);
50 struct spool_stats_t {
51 uint32_t data_jobs; /* current jobs spooling data */
53 uint32_t total_data_jobs; /* total jobs to have spooled data */
54 uint32_t total_attr_jobs;
55 int64_t max_data_size; /* max data size */
56 int64_t max_attr_size;
57 int64_t data_size; /* current data size (all jobs running) */
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 spool_stats_t spool_stats;
65 * Header for data spool record */
67 int32_t FirstIndex; /* FirstIndex for buffer */
68 int32_t LastIndex; /* LastIndex for buffer */
69 uint32_t len; /* length of next buffer */
78 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
80 char *msg, ed1[30], ed2[30];
83 msg = (char *)get_pool_memory(PM_MESSAGE);
85 if (spool_stats.data_jobs || spool_stats.max_data_size) {
86 len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
87 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
88 spool_stats.total_data_jobs,
89 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
91 sendit(msg, len, arg);
93 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
94 len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
95 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
96 spool_stats.total_attr_jobs,
97 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
99 sendit(msg, len, arg);
102 free_pool_memory(msg);
105 bool begin_data_spool(DCR *dcr)
108 if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
109 Dmsg0(100, "Turning on data spooling\n");
110 dcr->spool_data = true;
111 stat = open_data_spool_file(dcr);
113 dcr->spooling = true;
114 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
116 spool_stats.data_jobs++;
123 bool discard_data_spool(DCR *dcr)
126 Dmsg0(100, "Data spooling discarded\n");
127 return close_data_spool_file(dcr);
132 bool commit_data_spool(DCR *dcr)
137 Dmsg0(100, "Committing spooled data\n");
138 stat = despool_data(dcr, true /*commit*/);
140 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
141 close_data_spool_file(dcr);
144 return close_data_spool_file(dcr);
149 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
152 if (dcr->dev->device->spool_directory) {
153 dir = dcr->dev->device->spool_directory;
155 dir = working_directory;
157 Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job,
158 dcr->device->hdr.name);
162 static bool open_data_spool_file(DCR *dcr)
164 POOLMEM *name = get_pool_memory(PM_MESSAGE);
167 make_unique_data_spool_filename(dcr, &name);
168 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
169 dcr->spool_fd = spool_fd;
170 dcr->jcr->spool_attributes = true;
173 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
175 free_pool_memory(name);
178 Dmsg1(100, "Created spool file: %s\n", name);
179 free_pool_memory(name);
183 static bool close_data_spool_file(DCR *dcr)
185 POOLMEM *name = get_pool_memory(PM_MESSAGE);
188 spool_stats.data_jobs--;
189 spool_stats.total_data_jobs++;
190 if (spool_stats.data_size < dcr->job_spool_size) {
191 spool_stats.data_size = 0;
193 spool_stats.data_size -= dcr->job_spool_size;
195 dcr->job_spool_size = 0;
198 make_unique_data_spool_filename(dcr, &name);
199 close(dcr->spool_fd);
201 dcr->spooling = false;
203 Dmsg1(100, "Deleted spool file: %s\n", name);
204 free_pool_memory(name);
208 static const char *spool_name = "*spool*";
210 static bool despool_data(DCR *dcr, bool commit)
220 Dmsg0(100, "Despooling data\n");
221 /* Commit means that the job is done, so we commit, otherwise, we
222 * are despooling because of user spool size max or some error
223 * (e.g. filesystem full).
226 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
227 jcr->dcr->VolumeName,
228 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
230 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
231 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
233 dcr->despool_wait = true;
234 dcr->spooling = false;
235 lock_device(dcr->dev);
236 dcr->despool_wait = false;
237 dcr->despooling = true;
238 dcr->dev_locked = true;
241 * This is really quite kludgy and should be fixed some time.
242 * We create a dev structure to read from the spool file
245 rdev = (DEVICE *)malloc(sizeof(DEVICE));
246 memset(rdev, 0, sizeof(DEVICE));
247 rdev->dev_name = get_memory(strlen(spool_name)+1);
248 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
249 rdev->errmsg = get_pool_memory(PM_EMSG);
251 rdev->max_block_size = dcr->dev->max_block_size;
252 rdev->min_block_size = dcr->dev->min_block_size;
253 rdev->device = dcr->dev->device;
254 rdcr = new_dcr(NULL, rdev);
255 rdcr->spool_fd = dcr->spool_fd;
256 rdcr->jcr = jcr; /* set a valid jcr */
257 block = dcr->block; /* save block */
258 dcr->block = rdcr->block; /* make read and write block the same */
260 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
261 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
263 /* Add run time, to get current wait time */
264 time_t despool_start = time(NULL) - jcr->run_time;
267 if (job_canceled(jcr)) {
271 stat = read_block_from_spool_file(rdcr);
272 if (stat == RB_EOT) {
274 } else if (stat == RB_ERROR) {
278 ok = write_block_to_device(dcr);
280 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
281 dcr->dev->print_name(), dcr->dev->bstrerror());
283 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
286 /* Subtracting run_time give us elapsed time - wait_time since we started despooling */
287 time_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
289 if (despool_elapsed <= 0) {
293 Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
294 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
295 edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
297 dcr->block = block; /* reset block */
299 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
300 if (ftruncate(rdcr->spool_fd, 0) != 0) {
302 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
304 Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
309 if (spool_stats.data_size < dcr->job_spool_size) {
310 spool_stats.data_size = 0;
312 spool_stats.data_size -= dcr->job_spool_size;
315 P(dcr->dev->spool_mutex);
316 dcr->dev->spool_size -= dcr->job_spool_size;
317 dcr->job_spool_size = 0; /* zap size in input dcr */
318 V(dcr->dev->spool_mutex);
319 free_memory(rdev->dev_name);
320 free_pool_memory(rdev->errmsg);
321 /* Be careful to NULL the jcr and free rdev after free_dcr() */
325 dcr->spooling = true; /* turn on spooling again */
326 dcr->despooling = false;
327 /* If doing a commit, leave the device locked -- unlocked in release_device() */
329 dcr->dev_locked = false;
330 unlock_device(dcr->dev);
336 * Read a block from the spool file
338 * Returns RB_OK on success
339 * RB_EOT when file done
342 static int read_block_from_spool_file(DCR *dcr)
347 DEV_BLOCK *block = dcr->block;
350 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
352 Dmsg0(100, "EOT on spool read.\n");
354 } else if (stat != (ssize_t)rlen) {
357 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
360 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
361 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
366 if (rlen > block->buf_len) {
367 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
368 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
371 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
372 if (stat != (ssize_t)rlen) {
373 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
374 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
377 /* Setup write pointers */
378 block->binbuf = rlen;
379 block->bufp = block->buf + block->binbuf;
380 block->FirstIndex = hdr.FirstIndex;
381 block->LastIndex = hdr.LastIndex;
382 block->VolSessionId = dcr->jcr->VolSessionId;
383 block->VolSessionTime = dcr->jcr->VolSessionTime;
384 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
389 * Write a block to the spool file
391 * Returns: true on success or EOT
392 * false on hard error
394 bool write_block_to_spool_file(DCR *dcr)
396 uint32_t wlen, hlen; /* length to write */
397 bool despool = false;
398 DEV_BLOCK *block = dcr->block;
400 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
401 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
405 hlen = sizeof(spool_hdr);
406 wlen = block->binbuf;
407 P(dcr->dev->spool_mutex);
408 dcr->job_spool_size += hlen + wlen;
409 dcr->dev->spool_size += hlen + wlen;
410 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
411 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
414 V(dcr->dev->spool_mutex);
416 spool_stats.data_size += hlen + wlen;
417 if (spool_stats.data_size > spool_stats.max_data_size) {
418 spool_stats.max_data_size = spool_stats.data_size;
423 char ec1[30], ec2[30], ec3[30], ec4[30];
424 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
425 "max_job_size=%s job_size=%s\n",
426 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
427 edit_uint64_with_commas(dcr->job_spool_size, ec2),
428 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
429 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
431 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
432 if (!despool_data(dcr, false)) {
433 Pmsg0(000, _("Bad return from despool in write_block.\n"));
436 /* Despooling cleared these variables so reset them */
437 P(dcr->dev->spool_mutex);
438 dcr->job_spool_size += hlen + wlen;
439 dcr->dev->spool_size += hlen + wlen;
440 V(dcr->dev->spool_mutex);
441 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
445 if (!write_spool_header(dcr)) {
448 if (!write_spool_data(dcr)) {
452 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
457 static bool write_spool_header(DCR *dcr)
461 DEV_BLOCK *block = dcr->block;
463 hdr.FirstIndex = block->FirstIndex;
464 hdr.LastIndex = block->LastIndex;
465 hdr.len = block->binbuf;
468 for (int retry=0; retry<=1; retry++) {
469 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
472 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
475 if (stat != (ssize_t)sizeof(hdr)) {
476 /* If we wrote something, truncate it, then despool */
478 #if defined(HAVE_WIN32)
479 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
481 boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
483 if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
485 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
490 if (!despool_data(dcr, false)) {
491 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
494 continue; /* try again */
498 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
502 static bool write_spool_data(DCR *dcr)
505 DEV_BLOCK *block = dcr->block;
508 for (int retry=0; retry<=1; retry++) {
509 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
512 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
515 if (stat != (ssize_t)block->binbuf) {
517 * If we wrote something, truncate it and the header, then despool
520 #if defined(HAVE_WIN32)
521 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
523 boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
525 if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
527 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
532 if (!despool_data(dcr, false)) {
533 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
536 if (!write_spool_header(dcr)) {
539 continue; /* try again */
543 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
549 bool are_attributes_spooled(JCR *jcr)
551 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
555 * Create spool file for attributes.
556 * This is done by "attaching" to the bsock, and when
557 * it is called, the output is written to a file.
558 * The actual spooling is turned on and off in
559 * append.c only during writing of the attributes.
561 bool begin_attribute_spool(JCR *jcr)
563 if (!jcr->no_attributes && jcr->spool_attributes) {
564 return open_attr_spool_file(jcr, jcr->dir_bsock);
569 bool discard_attribute_spool(JCR *jcr)
571 if (are_attributes_spooled(jcr)) {
572 return close_attr_spool_file(jcr, jcr->dir_bsock);
577 static void update_attr_spool_size(ssize_t size)
581 if ((spool_stats.attr_size - size) > 0) {
582 spool_stats.attr_size -= size;
584 spool_stats.attr_size = 0;
590 bool commit_attribute_spool(JCR *jcr)
595 if (are_attributes_spooled(jcr)) {
596 if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
598 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
602 size = ftello(jcr->dir_bsock->spool_fd);
605 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
610 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
611 spool_stats.max_attr_size = spool_stats.attr_size + size;
613 spool_stats.attr_size += size;
615 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
616 edit_uint64_with_commas(size, ec1));
617 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
618 return close_attr_spool_file(jcr, jcr->dir_bsock);
623 close_attr_spool_file(jcr, jcr->dir_bsock);
627 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
629 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
634 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
636 POOLMEM *name = get_pool_memory(PM_MESSAGE);
638 make_unique_spool_filename(jcr, &name, bs->fd);
639 bs->spool_fd = fopen(name, "w+b");
642 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
644 free_pool_memory(name);
648 spool_stats.attr_jobs++;
650 free_pool_memory(name);
654 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
661 name = get_pool_memory(PM_MESSAGE);
663 spool_stats.attr_jobs--;
664 spool_stats.total_attr_jobs++;
666 make_unique_spool_filename(jcr, &name, bs->fd);
667 fclose(bs->spool_fd);
669 free_pool_memory(name);