4 * Kern Sibbald, March 2004
9 Bacula® - The Network Backup Solution
11 Copyright (C) 2004-2006 Free Software Foundation Europe e.V.
13 The main author of Bacula is Kern Sibbald, with contributions from
14 many others, a complete list can be found in the file AUTHORS.
15 This program is Free Software; you can redistribute it and/or
16 modify it under the terms of version two of the GNU General Public
17 License as published by the Free Software Foundation plus additions
18 that are listed in the file LICENSE.
20 This program is distributed in the hope that it will be useful, but
21 WITHOUT ANY WARRANTY; without even the implied warranty of
22 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23 General Public License for more details.
25 You should have received a copy of the GNU General Public License
26 along with this program; if not, write to the Free Software
27 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 Bacula® is a registered trademark of John Walker.
31 The licensor of Bacula is the Free Software Foundation Europe
32 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
33 Switzerland, email:ftf@fsfeurope.org.
39 /* Forward referenced subroutines */
40 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
41 static bool open_data_spool_file(DCR *dcr);
42 static bool close_data_spool_file(DCR *dcr);
43 static bool despool_data(DCR *dcr, bool commit);
44 static int read_block_from_spool_file(DCR *dcr);
45 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
47 static bool write_spool_header(DCR *dcr);
48 static bool write_spool_data(DCR *dcr);
50 struct spool_stats_t {
51 uint32_t data_jobs; /* current jobs spooling data */
53 uint32_t total_data_jobs; /* total jobs to have spooled data */
54 uint32_t total_attr_jobs;
55 int64_t max_data_size; /* max data size */
56 int64_t max_attr_size;
57 int64_t data_size; /* current data size (all jobs running) */
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 spool_stats_t spool_stats;
65 * Header for data spool record */
67 int32_t FirstIndex; /* FirstIndex for buffer */
68 int32_t LastIndex; /* LastIndex for buffer */
69 uint32_t len; /* length of next buffer */
78 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
80 char *msg, ed1[30], ed2[30];
83 msg = (char *)get_pool_memory(PM_MESSAGE);
85 if (spool_stats.data_jobs || spool_stats.max_data_size) {
86 len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
87 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
88 spool_stats.total_data_jobs,
89 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
91 sendit(msg, len, arg);
93 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
94 len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
95 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
96 spool_stats.total_attr_jobs,
97 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
99 sendit(msg, len, arg);
102 free_pool_memory(msg);
105 bool begin_data_spool(DCR *dcr)
108 if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
109 Dmsg0(100, "Turning on data spooling\n");
110 dcr->spool_data = true;
111 stat = open_data_spool_file(dcr);
113 dcr->spooling = true;
114 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
116 spool_stats.data_jobs++;
123 bool discard_data_spool(DCR *dcr)
126 Dmsg0(100, "Data spooling discarded\n");
127 return close_data_spool_file(dcr);
132 bool commit_data_spool(DCR *dcr)
137 Dmsg0(100, "Committing spooled data\n");
138 stat = despool_data(dcr, true /*commit*/);
140 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
141 close_data_spool_file(dcr);
144 return close_data_spool_file(dcr);
149 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
152 if (dcr->dev->device->spool_directory) {
153 dir = dcr->dev->device->spool_directory;
155 dir = working_directory;
157 Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
158 dcr->jcr->Job, dcr->device->hdr.name);
162 static bool open_data_spool_file(DCR *dcr)
164 POOLMEM *name = get_pool_memory(PM_MESSAGE);
167 make_unique_data_spool_filename(dcr, &name);
168 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
169 dcr->spool_fd = spool_fd;
170 dcr->jcr->spool_attributes = true;
173 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
175 free_pool_memory(name);
178 Dmsg1(100, "Created spool file: %s\n", name);
179 free_pool_memory(name);
183 static bool close_data_spool_file(DCR *dcr)
185 POOLMEM *name = get_pool_memory(PM_MESSAGE);
188 spool_stats.data_jobs--;
189 spool_stats.total_data_jobs++;
190 if (spool_stats.data_size < dcr->job_spool_size) {
191 spool_stats.data_size = 0;
193 spool_stats.data_size -= dcr->job_spool_size;
195 dcr->job_spool_size = 0;
198 make_unique_data_spool_filename(dcr, &name);
199 close(dcr->spool_fd);
201 dcr->spooling = false;
203 Dmsg1(100, "Deleted spool file: %s\n", name);
204 free_pool_memory(name);
208 static const char *spool_name = "*spool*";
210 static bool despool_data(DCR *dcr, bool commit)
220 Dmsg0(100, "Despooling data\n");
221 /* Commit means that the job is done, so we commit, otherwise, we
222 * are despooling because of user spool size max or some error
223 * (e.g. filesystem full).
226 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
227 jcr->dcr->VolumeName,
228 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
230 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
231 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
233 dcr->despool_wait = true;
234 dcr->spooling = false;
235 lock_device(dcr->dev);
236 dcr->despool_wait = false;
237 dcr->despooling = true;
238 dcr->dev_locked = true;
241 * This is really quite kludgy and should be fixed some time.
242 * We create a dev structure to read from the spool file
245 rdev = (DEVICE *)malloc(sizeof(DEVICE));
246 memset(rdev, 0, sizeof(DEVICE));
247 rdev->dev_name = get_memory(strlen(spool_name)+1);
248 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
249 rdev->errmsg = get_pool_memory(PM_EMSG);
251 rdev->max_block_size = dcr->dev->max_block_size;
252 rdev->min_block_size = dcr->dev->min_block_size;
253 rdev->device = dcr->dev->device;
254 rdcr = new_dcr(NULL, rdev);
255 rdcr->spool_fd = dcr->spool_fd;
256 rdcr->jcr = jcr; /* set a valid jcr */
257 block = dcr->block; /* save block */
258 dcr->block = rdcr->block; /* make read and write block the same */
260 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
261 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
263 /* Add run time, to get current wait time */
264 time_t despool_start = time(NULL) - jcr->run_time;
267 if (job_canceled(jcr)) {
271 stat = read_block_from_spool_file(rdcr);
272 if (stat == RB_EOT) {
274 } else if (stat == RB_ERROR) {
278 ok = write_block_to_device(dcr);
280 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
281 dcr->dev->print_name(), dcr->dev->bstrerror());
283 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
286 /* Subtracting run_time give us elapsed time - wait_time since we started despooling */
287 time_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
289 if (despool_elapsed <= 0) {
293 Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
294 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
295 edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
297 dcr->block = block; /* reset block */
299 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
300 if (ftruncate(rdcr->spool_fd, 0) != 0) {
302 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
304 Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
309 if (spool_stats.data_size < dcr->job_spool_size) {
310 spool_stats.data_size = 0;
312 spool_stats.data_size -= dcr->job_spool_size;
315 P(dcr->dev->spool_mutex);
316 dcr->dev->spool_size -= dcr->job_spool_size;
317 dcr->job_spool_size = 0; /* zap size in input dcr */
318 V(dcr->dev->spool_mutex);
319 free_memory(rdev->dev_name);
320 free_pool_memory(rdev->errmsg);
321 /* Be careful to NULL the jcr and free rdev after free_dcr() */
326 dcr->spooling = true; /* turn on spooling again */
327 dcr->despooling = false;
328 /* If doing a commit, leave the device locked -- unlocked in release_device() */
330 dcr->dev_locked = false;
331 unlock_device(dcr->dev);
337 * Read a block from the spool file
339 * Returns RB_OK on success
340 * RB_EOT when file done
343 static int read_block_from_spool_file(DCR *dcr)
348 DEV_BLOCK *block = dcr->block;
351 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
353 Dmsg0(100, "EOT on spool read.\n");
355 } else if (stat != (ssize_t)rlen) {
358 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
361 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
362 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
367 if (rlen > block->buf_len) {
368 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
369 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
372 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
373 if (stat != (ssize_t)rlen) {
374 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
375 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
378 /* Setup write pointers */
379 block->binbuf = rlen;
380 block->bufp = block->buf + block->binbuf;
381 block->FirstIndex = hdr.FirstIndex;
382 block->LastIndex = hdr.LastIndex;
383 block->VolSessionId = dcr->jcr->VolSessionId;
384 block->VolSessionTime = dcr->jcr->VolSessionTime;
385 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
390 * Write a block to the spool file
392 * Returns: true on success or EOT
393 * false on hard error
395 bool write_block_to_spool_file(DCR *dcr)
397 uint32_t wlen, hlen; /* length to write */
398 bool despool = false;
399 DEV_BLOCK *block = dcr->block;
401 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
402 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
406 hlen = sizeof(spool_hdr);
407 wlen = block->binbuf;
408 P(dcr->dev->spool_mutex);
409 dcr->job_spool_size += hlen + wlen;
410 dcr->dev->spool_size += hlen + wlen;
411 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
412 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
415 V(dcr->dev->spool_mutex);
417 spool_stats.data_size += hlen + wlen;
418 if (spool_stats.data_size > spool_stats.max_data_size) {
419 spool_stats.max_data_size = spool_stats.data_size;
424 char ec1[30], ec2[30], ec3[30], ec4[30];
425 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
426 "max_job_size=%s job_size=%s\n",
427 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
428 edit_uint64_with_commas(dcr->job_spool_size, ec2),
429 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
430 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
432 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
433 if (!despool_data(dcr, false)) {
434 Pmsg0(000, _("Bad return from despool in write_block.\n"));
437 /* Despooling cleared these variables so reset them */
438 P(dcr->dev->spool_mutex);
439 dcr->job_spool_size += hlen + wlen;
440 dcr->dev->spool_size += hlen + wlen;
441 V(dcr->dev->spool_mutex);
442 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
446 if (!write_spool_header(dcr)) {
449 if (!write_spool_data(dcr)) {
453 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
458 static bool write_spool_header(DCR *dcr)
462 DEV_BLOCK *block = dcr->block;
464 hdr.FirstIndex = block->FirstIndex;
465 hdr.LastIndex = block->LastIndex;
466 hdr.len = block->binbuf;
469 for (int retry=0; retry<=1; retry++) {
470 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
473 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
476 if (stat != (ssize_t)sizeof(hdr)) {
477 /* If we wrote something, truncate it, then despool */
479 #if defined(HAVE_WIN32)
480 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
482 boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
484 if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
486 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
491 if (!despool_data(dcr, false)) {
492 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
495 continue; /* try again */
499 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
503 static bool write_spool_data(DCR *dcr)
506 DEV_BLOCK *block = dcr->block;
509 for (int retry=0; retry<=1; retry++) {
510 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
513 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
516 if (stat != (ssize_t)block->binbuf) {
518 * If we wrote something, truncate it and the header, then despool
521 #if defined(HAVE_WIN32)
522 boffset_t pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
524 boffset_t pos = lseek(dcr->spool_fd, (off_t)0, SEEK_CUR);
526 if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
528 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
533 if (!despool_data(dcr, false)) {
534 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
537 if (!write_spool_header(dcr)) {
540 continue; /* try again */
544 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
550 bool are_attributes_spooled(JCR *jcr)
552 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
556 * Create spool file for attributes.
557 * This is done by "attaching" to the bsock, and when
558 * it is called, the output is written to a file.
559 * The actual spooling is turned on and off in
560 * append.c only during writing of the attributes.
562 bool begin_attribute_spool(JCR *jcr)
564 if (!jcr->no_attributes && jcr->spool_attributes) {
565 return open_attr_spool_file(jcr, jcr->dir_bsock);
570 bool discard_attribute_spool(JCR *jcr)
572 if (are_attributes_spooled(jcr)) {
573 return close_attr_spool_file(jcr, jcr->dir_bsock);
578 static void update_attr_spool_size(ssize_t size)
582 if ((spool_stats.attr_size - size) > 0) {
583 spool_stats.attr_size -= size;
585 spool_stats.attr_size = 0;
591 bool commit_attribute_spool(JCR *jcr)
596 if (are_attributes_spooled(jcr)) {
597 if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
599 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
603 size = ftello(jcr->dir_bsock->spool_fd);
606 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
611 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
612 spool_stats.max_attr_size = spool_stats.attr_size + size;
614 spool_stats.attr_size += size;
616 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
617 edit_uint64_with_commas(size, ec1));
618 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
619 return close_attr_spool_file(jcr, jcr->dir_bsock);
624 close_attr_spool_file(jcr, jcr->dir_bsock);
628 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
630 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
635 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
637 POOLMEM *name = get_pool_memory(PM_MESSAGE);
639 make_unique_spool_filename(jcr, &name, bs->fd);
640 bs->spool_fd = fopen(name, "w+b");
643 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
645 free_pool_memory(name);
649 spool_stats.attr_jobs++;
651 free_pool_memory(name);
655 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
662 name = get_pool_memory(PM_MESSAGE);
664 spool_stats.attr_jobs--;
665 spool_stats.total_attr_jobs++;
667 make_unique_spool_filename(jcr, &name, bs->fd);
668 fclose(bs->spool_fd);
670 free_pool_memory(name);