4 * Kern Sibbald, March 2004
9 Copyright (C) 2004-2006 Kern Sibbald
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License
13 version 2 as amended with additional clauses defined in the
14 file LICENSE in the main source directory.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 the file LICENSE for additional details.
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
37 struct spool_stats_t {
38 uint32_t data_jobs; /* current jobs spooling data */
40 uint32_t total_data_jobs; /* total jobs to have spooled data */
41 uint32_t total_attr_jobs;
42 int64_t max_data_size; /* max data size */
43 int64_t max_attr_size;
44 int64_t data_size; /* current data size (all jobs running) */
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
52 * Header for data spool record */
54 int32_t FirstIndex; /* FirstIndex for buffer */
55 int32_t LastIndex; /* LastIndex for buffer */
56 uint32_t len; /* length of next buffer */
65 void list_spool_stats(BSOCK *bs)
67 char ed1[30], ed2[30];
68 if (spool_stats.data_jobs || spool_stats.max_data_size) {
69 bnet_fsend(bs, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
70 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71 spool_stats.total_data_jobs,
72 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
74 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75 bnet_fsend(bs, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
76 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77 spool_stats.total_attr_jobs,
78 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
82 bool begin_data_spool(DCR *dcr)
85 if (dcr->jcr->spool_data) {
86 Dmsg0(100, "Turning on data spooling\n");
87 dcr->spool_data = true;
88 stat = open_data_spool_file(dcr);
91 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
93 spool_stats.data_jobs++;
100 bool discard_data_spool(DCR *dcr)
103 Dmsg0(100, "Data spooling discarded\n");
104 return close_data_spool_file(dcr);
109 bool commit_data_spool(DCR *dcr)
114 Dmsg0(100, "Committing spooled data\n");
115 stat = despool_data(dcr, true /*commit*/);
117 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
118 close_data_spool_file(dcr);
121 return close_data_spool_file(dcr);
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
129 if (dcr->dev->device->spool_directory) {
130 dir = dcr->dev->device->spool_directory;
132 dir = working_directory;
134 Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job,
135 dcr->device->hdr.name);
139 static bool open_data_spool_file(DCR *dcr)
141 POOLMEM *name = get_pool_memory(PM_MESSAGE);
144 make_unique_data_spool_filename(dcr, &name);
145 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146 dcr->spool_fd = spool_fd;
147 dcr->jcr->spool_attributes = true;
150 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
152 free_pool_memory(name);
155 Dmsg1(100, "Created spool file: %s\n", name);
156 free_pool_memory(name);
160 static bool close_data_spool_file(DCR *dcr)
162 POOLMEM *name = get_pool_memory(PM_MESSAGE);
165 spool_stats.data_jobs--;
166 spool_stats.total_data_jobs++;
167 if (spool_stats.data_size < dcr->job_spool_size) {
168 spool_stats.data_size = 0;
170 spool_stats.data_size -= dcr->job_spool_size;
172 dcr->job_spool_size = 0;
175 make_unique_data_spool_filename(dcr, &name);
176 close(dcr->spool_fd);
178 dcr->spooling = false;
180 Dmsg1(100, "Deleted spool file: %s\n", name);
181 free_pool_memory(name);
185 static const char *spool_name = "*spool*";
187 static bool despool_data(DCR *dcr, bool commit)
197 Dmsg0(100, "Despooling data\n");
198 /* Commit means that the job is done, so we commit, otherwise, we
199 * are despooling because of user spool size max or some error
200 * (e.g. filesystem full).
203 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
204 jcr->dcr->VolumeName,
205 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
207 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
208 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
210 dcr->despool_wait = true;
211 dcr->spooling = false;
212 lock_device(dcr->dev);
213 dcr->despool_wait = false;
214 dcr->despooling = true;
215 dcr->dev_locked = true;
218 * This is really quite kludgy and should be fixed some time.
219 * We create a dev structure to read from the spool file
222 rdev = (DEVICE *)malloc(sizeof(DEVICE));
223 memset(rdev, 0, sizeof(DEVICE));
224 rdev->dev_name = get_memory(strlen(spool_name)+1);
225 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
226 rdev->errmsg = get_pool_memory(PM_EMSG);
228 rdev->max_block_size = dcr->dev->max_block_size;
229 rdev->min_block_size = dcr->dev->min_block_size;
230 rdev->device = dcr->dev->device;
231 rdcr = new_dcr(NULL, rdev);
232 rdcr->spool_fd = dcr->spool_fd;
233 rdcr->jcr = jcr; /* set a valid jcr */
234 block = dcr->block; /* save block */
235 dcr->block = rdcr->block; /* make read and write block the same */
237 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
238 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
240 time_t despool_start = time(NULL);
243 if (job_canceled(jcr)) {
247 stat = read_block_from_spool_file(rdcr);
248 if (stat == RB_EOT) {
250 } else if (stat == RB_ERROR) {
254 ok = write_block_to_device(dcr);
256 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
257 dcr->dev->print_name(), dcr->dev->bstrerror());
259 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
262 time_t despool_elapsed = time(NULL) - despool_start;
264 if (despool_elapsed == 0)
267 Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
268 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
269 edit_uint64_with_commas(jcr->dcr->job_spool_size / despool_elapsed, ec1));
271 dcr->block = block; /* reset block */
273 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
274 if (ftruncate(rdcr->spool_fd, 0) != 0) {
276 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
278 Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
283 if (spool_stats.data_size < dcr->job_spool_size) {
284 spool_stats.data_size = 0;
286 spool_stats.data_size -= dcr->job_spool_size;
289 P(dcr->dev->spool_mutex);
290 dcr->dev->spool_size -= dcr->job_spool_size;
291 dcr->job_spool_size = 0; /* zap size in input dcr */
292 V(dcr->dev->spool_mutex);
293 free_memory(rdev->dev_name);
294 free_pool_memory(rdev->errmsg);
295 /* Be careful to NULL the jcr and free rdev after free_dcr() */
299 dcr->dev_locked = false;
300 dcr->spooling = true; /* turn on spooling again */
301 dcr->despooling = false;
302 unlock_device(dcr->dev);
307 * Read a block from the spool file
309 * Returns RB_OK on success
310 * RB_EOT when file done
313 static int read_block_from_spool_file(DCR *dcr)
318 DEV_BLOCK *block = dcr->block;
321 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
323 Dmsg0(100, "EOT on spool read.\n");
325 } else if (stat != (ssize_t)rlen) {
328 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
331 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
332 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
337 if (rlen > block->buf_len) {
338 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
339 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
342 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
343 if (stat != (ssize_t)rlen) {
344 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
345 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
348 /* Setup write pointers */
349 block->binbuf = rlen;
350 block->bufp = block->buf + block->binbuf;
351 block->FirstIndex = hdr.FirstIndex;
352 block->LastIndex = hdr.LastIndex;
353 block->VolSessionId = dcr->jcr->VolSessionId;
354 block->VolSessionTime = dcr->jcr->VolSessionTime;
355 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
360 * Write a block to the spool file
362 * Returns: true on success or EOT
363 * false on hard error
365 bool write_block_to_spool_file(DCR *dcr)
367 uint32_t wlen, hlen; /* length to write */
368 bool despool = false;
369 DEV_BLOCK *block = dcr->block;
371 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
372 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
376 hlen = sizeof(spool_hdr);
377 wlen = block->binbuf;
378 P(dcr->dev->spool_mutex);
379 dcr->job_spool_size += hlen + wlen;
380 dcr->dev->spool_size += hlen + wlen;
381 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
382 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
385 V(dcr->dev->spool_mutex);
387 spool_stats.data_size += hlen + wlen;
388 if (spool_stats.data_size > spool_stats.max_data_size) {
389 spool_stats.max_data_size = spool_stats.data_size;
394 char ec1[30], ec2[30], ec3[30], ec4[30];
395 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
396 "max_job_size=%s job_size=%s\n",
397 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
398 edit_uint64_with_commas(dcr->job_spool_size, ec2),
399 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
400 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
402 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
403 if (!despool_data(dcr, false)) {
404 Pmsg0(000, _("Bad return from despool in write_block.\n"));
407 /* Despooling cleared these variables so reset them */
408 P(dcr->dev->spool_mutex);
409 dcr->job_spool_size += hlen + wlen;
410 dcr->dev->spool_size += hlen + wlen;
411 V(dcr->dev->spool_mutex);
412 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
416 if (!write_spool_header(dcr)) {
419 if (!write_spool_data(dcr)) {
423 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
428 static bool write_spool_header(DCR *dcr)
432 DEV_BLOCK *block = dcr->block;
434 hdr.FirstIndex = block->FirstIndex;
435 hdr.LastIndex = block->LastIndex;
436 hdr.len = block->binbuf;
439 for (int retry=0; retry<=1; retry++) {
440 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
443 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
446 if (stat != (ssize_t)sizeof(hdr)) {
447 /* If we wrote something, truncate it, then despool */
449 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
451 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
456 if (!despool_data(dcr, false)) {
457 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
460 continue; /* try again */
464 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
468 static bool write_spool_data(DCR *dcr)
471 DEV_BLOCK *block = dcr->block;
474 for (int retry=0; retry<=1; retry++) {
475 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
478 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
481 if (stat != (ssize_t)block->binbuf) {
483 * If we wrote something, truncate it and the header, then despool
486 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
487 - stat - sizeof(spool_hdr)) != 0) {
489 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
494 if (!despool_data(dcr, false)) {
495 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
498 if (!write_spool_header(dcr)) {
501 continue; /* try again */
505 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
511 bool are_attributes_spooled(JCR *jcr)
513 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
517 * Create spool file for attributes.
518 * This is done by "attaching" to the bsock, and when
519 * it is called, the output is written to a file.
520 * The actual spooling is turned on and off in
521 * append.c only during writing of the attributes.
523 bool begin_attribute_spool(JCR *jcr)
525 if (!jcr->no_attributes && jcr->spool_attributes) {
526 return open_attr_spool_file(jcr, jcr->dir_bsock);
531 bool discard_attribute_spool(JCR *jcr)
533 if (are_attributes_spooled(jcr)) {
534 return close_attr_spool_file(jcr, jcr->dir_bsock);
539 static void update_attr_spool_size(ssize_t size)
543 if ((spool_stats.attr_size - size) > 0) {
544 spool_stats.attr_size -= size;
546 spool_stats.attr_size = 0;
552 bool commit_attribute_spool(JCR *jcr)
557 if (are_attributes_spooled(jcr)) {
558 if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
560 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
564 size = ftello(jcr->dir_bsock->spool_fd);
567 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
572 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
573 spool_stats.max_attr_size = spool_stats.attr_size + size;
575 spool_stats.attr_size += size;
577 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
578 edit_uint64_with_commas(size, ec1));
579 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
580 return close_attr_spool_file(jcr, jcr->dir_bsock);
585 close_attr_spool_file(jcr, jcr->dir_bsock);
589 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
591 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
596 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
598 POOLMEM *name = get_pool_memory(PM_MESSAGE);
600 make_unique_spool_filename(jcr, &name, bs->fd);
601 bs->spool_fd = fopen(name, "w+b");
604 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
606 free_pool_memory(name);
610 spool_stats.attr_jobs++;
612 free_pool_memory(name);
616 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
623 name = get_pool_memory(PM_MESSAGE);
625 spool_stats.attr_jobs--;
626 spool_stats.total_attr_jobs++;
628 make_unique_spool_filename(jcr, &name, bs->fd);
629 fclose(bs->spool_fd);
631 free_pool_memory(name);