4 * Kern Sibbald, March 2004
9 Copyright (C) 2004-2006 Kern Sibbald
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License
13 version 2 as amended with additional clauses defined in the
14 file LICENSE in the main source directory.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 the file LICENSE for additional details.
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
37 struct spool_stats_t {
38 uint32_t data_jobs; /* current jobs spooling data */
40 uint32_t total_data_jobs; /* total jobs to have spooled data */
41 uint32_t total_attr_jobs;
42 int64_t max_data_size; /* max data size */
43 int64_t max_attr_size;
44 int64_t data_size; /* current data size (all jobs running) */
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
52 * Header for data spool record */
54 int32_t FirstIndex; /* FirstIndex for buffer */
55 int32_t LastIndex; /* LastIndex for buffer */
56 uint32_t len; /* length of next buffer */
65 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
67 char *msg, ed1[30], ed2[30];
70 msg = (char *)get_pool_memory(PM_MESSAGE);
72 if (spool_stats.data_jobs || spool_stats.max_data_size) {
73 len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
74 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
75 spool_stats.total_data_jobs,
76 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
78 sendit(msg, len, arg);
80 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
81 len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
82 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
83 spool_stats.total_attr_jobs,
84 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
86 sendit(msg, len, arg);
89 free_pool_memory(msg);
92 bool begin_data_spool(DCR *dcr)
95 if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
96 Dmsg0(100, "Turning on data spooling\n");
97 dcr->spool_data = true;
98 stat = open_data_spool_file(dcr);
100 dcr->spooling = true;
101 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
103 spool_stats.data_jobs++;
110 bool discard_data_spool(DCR *dcr)
113 Dmsg0(100, "Data spooling discarded\n");
114 return close_data_spool_file(dcr);
119 bool commit_data_spool(DCR *dcr)
124 Dmsg0(100, "Committing spooled data\n");
125 stat = despool_data(dcr, true /*commit*/);
127 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
128 close_data_spool_file(dcr);
131 return close_data_spool_file(dcr);
136 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
139 if (dcr->dev->device->spool_directory) {
140 dir = dcr->dev->device->spool_directory;
142 dir = working_directory;
144 Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job,
145 dcr->device->hdr.name);
149 static bool open_data_spool_file(DCR *dcr)
151 POOLMEM *name = get_pool_memory(PM_MESSAGE);
154 make_unique_data_spool_filename(dcr, &name);
155 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
156 dcr->spool_fd = spool_fd;
157 dcr->jcr->spool_attributes = true;
160 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
162 free_pool_memory(name);
165 Dmsg1(100, "Created spool file: %s\n", name);
166 free_pool_memory(name);
170 static bool close_data_spool_file(DCR *dcr)
172 POOLMEM *name = get_pool_memory(PM_MESSAGE);
175 spool_stats.data_jobs--;
176 spool_stats.total_data_jobs++;
177 if (spool_stats.data_size < dcr->job_spool_size) {
178 spool_stats.data_size = 0;
180 spool_stats.data_size -= dcr->job_spool_size;
182 dcr->job_spool_size = 0;
185 make_unique_data_spool_filename(dcr, &name);
186 close(dcr->spool_fd);
188 dcr->spooling = false;
190 Dmsg1(100, "Deleted spool file: %s\n", name);
191 free_pool_memory(name);
195 static const char *spool_name = "*spool*";
197 static bool despool_data(DCR *dcr, bool commit)
207 Dmsg0(100, "Despooling data\n");
208 /* Commit means that the job is done, so we commit, otherwise, we
209 * are despooling because of user spool size max or some error
210 * (e.g. filesystem full).
213 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
214 jcr->dcr->VolumeName,
215 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
217 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
218 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
220 dcr->despool_wait = true;
221 dcr->spooling = false;
222 lock_device(dcr->dev);
223 dcr->despool_wait = false;
224 dcr->despooling = true;
225 dcr->dev_locked = true;
228 * This is really quite kludgy and should be fixed some time.
229 * We create a dev structure to read from the spool file
232 rdev = (DEVICE *)malloc(sizeof(DEVICE));
233 memset(rdev, 0, sizeof(DEVICE));
234 rdev->dev_name = get_memory(strlen(spool_name)+1);
235 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
236 rdev->errmsg = get_pool_memory(PM_EMSG);
238 rdev->max_block_size = dcr->dev->max_block_size;
239 rdev->min_block_size = dcr->dev->min_block_size;
240 rdev->device = dcr->dev->device;
241 rdcr = new_dcr(NULL, rdev);
242 rdcr->spool_fd = dcr->spool_fd;
243 rdcr->jcr = jcr; /* set a valid jcr */
244 block = dcr->block; /* save block */
245 dcr->block = rdcr->block; /* make read and write block the same */
247 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
248 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
250 /* Add run time, to get current wait time */
251 time_t despool_start = time(NULL) - jcr->run_time;
254 if (job_canceled(jcr)) {
258 stat = read_block_from_spool_file(rdcr);
259 if (stat == RB_EOT) {
261 } else if (stat == RB_ERROR) {
265 ok = write_block_to_device(dcr);
267 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
268 dcr->dev->print_name(), dcr->dev->bstrerror());
270 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
273 /* Subtracting run_time give us elapsed time - wait_time since we started despooling */
274 time_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
276 if (despool_elapsed <= 0) {
280 Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
281 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
282 edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
284 dcr->block = block; /* reset block */
286 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
287 if (ftruncate(rdcr->spool_fd, 0) != 0) {
289 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
291 Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
296 if (spool_stats.data_size < dcr->job_spool_size) {
297 spool_stats.data_size = 0;
299 spool_stats.data_size -= dcr->job_spool_size;
302 P(dcr->dev->spool_mutex);
303 dcr->dev->spool_size -= dcr->job_spool_size;
304 dcr->job_spool_size = 0; /* zap size in input dcr */
305 V(dcr->dev->spool_mutex);
306 free_memory(rdev->dev_name);
307 free_pool_memory(rdev->errmsg);
308 /* Be careful to NULL the jcr and free rdev after free_dcr() */
312 dcr->spooling = true; /* turn on spooling again */
313 dcr->despooling = false;
314 /* If doing a commit, leave the device locked -- unlocked in release_device() */
316 dcr->dev_locked = false;
317 unlock_device(dcr->dev);
323 * Read a block from the spool file
325 * Returns RB_OK on success
326 * RB_EOT when file done
329 static int read_block_from_spool_file(DCR *dcr)
334 DEV_BLOCK *block = dcr->block;
337 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
339 Dmsg0(100, "EOT on spool read.\n");
341 } else if (stat != (ssize_t)rlen) {
344 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
347 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
348 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
353 if (rlen > block->buf_len) {
354 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
355 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
358 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
359 if (stat != (ssize_t)rlen) {
360 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
361 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
364 /* Setup write pointers */
365 block->binbuf = rlen;
366 block->bufp = block->buf + block->binbuf;
367 block->FirstIndex = hdr.FirstIndex;
368 block->LastIndex = hdr.LastIndex;
369 block->VolSessionId = dcr->jcr->VolSessionId;
370 block->VolSessionTime = dcr->jcr->VolSessionTime;
371 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
376 * Write a block to the spool file
378 * Returns: true on success or EOT
379 * false on hard error
381 bool write_block_to_spool_file(DCR *dcr)
383 uint32_t wlen, hlen; /* length to write */
384 bool despool = false;
385 DEV_BLOCK *block = dcr->block;
387 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
388 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
392 hlen = sizeof(spool_hdr);
393 wlen = block->binbuf;
394 P(dcr->dev->spool_mutex);
395 dcr->job_spool_size += hlen + wlen;
396 dcr->dev->spool_size += hlen + wlen;
397 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
398 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
401 V(dcr->dev->spool_mutex);
403 spool_stats.data_size += hlen + wlen;
404 if (spool_stats.data_size > spool_stats.max_data_size) {
405 spool_stats.max_data_size = spool_stats.data_size;
410 char ec1[30], ec2[30], ec3[30], ec4[30];
411 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
412 "max_job_size=%s job_size=%s\n",
413 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
414 edit_uint64_with_commas(dcr->job_spool_size, ec2),
415 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
416 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
418 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
419 if (!despool_data(dcr, false)) {
420 Pmsg0(000, _("Bad return from despool in write_block.\n"));
423 /* Despooling cleared these variables so reset them */
424 P(dcr->dev->spool_mutex);
425 dcr->job_spool_size += hlen + wlen;
426 dcr->dev->spool_size += hlen + wlen;
427 V(dcr->dev->spool_mutex);
428 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
432 if (!write_spool_header(dcr)) {
435 if (!write_spool_data(dcr)) {
439 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
444 static bool write_spool_header(DCR *dcr)
448 DEV_BLOCK *block = dcr->block;
450 hdr.FirstIndex = block->FirstIndex;
451 hdr.LastIndex = block->LastIndex;
452 hdr.len = block->binbuf;
455 for (int retry=0; retry<=1; retry++) {
456 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
459 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
462 if (stat != (ssize_t)sizeof(hdr)) {
463 /* If we wrote something, truncate it, then despool */
465 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
467 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
472 if (!despool_data(dcr, false)) {
473 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
476 continue; /* try again */
480 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
484 static bool write_spool_data(DCR *dcr)
487 DEV_BLOCK *block = dcr->block;
490 for (int retry=0; retry<=1; retry++) {
491 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
494 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
497 if (stat != (ssize_t)block->binbuf) {
499 * If we wrote something, truncate it and the header, then despool
502 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
503 - stat - sizeof(spool_hdr)) != 0) {
505 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
510 if (!despool_data(dcr, false)) {
511 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
514 if (!write_spool_header(dcr)) {
517 continue; /* try again */
521 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
527 bool are_attributes_spooled(JCR *jcr)
529 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
533 * Create spool file for attributes.
534 * This is done by "attaching" to the bsock, and when
535 * it is called, the output is written to a file.
536 * The actual spooling is turned on and off in
537 * append.c only during writing of the attributes.
539 bool begin_attribute_spool(JCR *jcr)
541 if (!jcr->no_attributes && jcr->spool_attributes) {
542 return open_attr_spool_file(jcr, jcr->dir_bsock);
547 bool discard_attribute_spool(JCR *jcr)
549 if (are_attributes_spooled(jcr)) {
550 return close_attr_spool_file(jcr, jcr->dir_bsock);
555 static void update_attr_spool_size(ssize_t size)
559 if ((spool_stats.attr_size - size) > 0) {
560 spool_stats.attr_size -= size;
562 spool_stats.attr_size = 0;
568 bool commit_attribute_spool(JCR *jcr)
573 if (are_attributes_spooled(jcr)) {
574 if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
576 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
580 size = ftello(jcr->dir_bsock->spool_fd);
583 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
588 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
589 spool_stats.max_attr_size = spool_stats.attr_size + size;
591 spool_stats.attr_size += size;
593 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
594 edit_uint64_with_commas(size, ec1));
595 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
596 return close_attr_spool_file(jcr, jcr->dir_bsock);
601 close_attr_spool_file(jcr, jcr->dir_bsock);
605 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
607 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
612 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
614 POOLMEM *name = get_pool_memory(PM_MESSAGE);
616 make_unique_spool_filename(jcr, &name, bs->fd);
617 bs->spool_fd = fopen(name, "w+b");
620 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
622 free_pool_memory(name);
626 spool_stats.attr_jobs++;
628 free_pool_memory(name);
632 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
639 name = get_pool_memory(PM_MESSAGE);
641 spool_stats.attr_jobs--;
642 spool_stats.total_attr_jobs++;
644 make_unique_spool_filename(jcr, &name, bs->fd);
645 fclose(bs->spool_fd);
647 free_pool_memory(name);