4 * Kern Sibbald, March 2004
9 Copyright (C) 2004-2006 Kern Sibbald
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License
13 version 2 as amended with additional clauses defined in the
14 file LICENSE in the main source directory.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 the file LICENSE for additional details.
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
37 struct spool_stats_t {
38 uint32_t data_jobs; /* current jobs spooling data */
40 uint32_t total_data_jobs; /* total jobs to have spooled data */
41 uint32_t total_attr_jobs;
42 int64_t max_data_size; /* max data size */
43 int64_t max_attr_size;
44 int64_t data_size; /* current data size (all jobs running) */
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
52 * Header for data spool record */
54 int32_t FirstIndex; /* FirstIndex for buffer */
55 int32_t LastIndex; /* LastIndex for buffer */
56 uint32_t len; /* length of next buffer */
65 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
67 char *msg, ed1[30], ed2[30];
70 msg = (char *)get_pool_memory(PM_MESSAGE);
72 if (spool_stats.data_jobs || spool_stats.max_data_size) {
73 len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
74 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
75 spool_stats.total_data_jobs,
76 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
78 sendit(msg, len, arg);
80 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
81 len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
82 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
83 spool_stats.total_attr_jobs,
84 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
86 sendit(msg, len, arg);
89 free_pool_memory(msg);
92 bool begin_data_spool(DCR *dcr)
95 if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
96 Dmsg0(100, "Turning on data spooling\n");
97 dcr->spool_data = true;
98 stat = open_data_spool_file(dcr);
100 dcr->spooling = true;
101 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
103 spool_stats.data_jobs++;
110 bool discard_data_spool(DCR *dcr)
113 Dmsg0(100, "Data spooling discarded\n");
114 return close_data_spool_file(dcr);
119 bool commit_data_spool(DCR *dcr)
124 Dmsg0(100, "Committing spooled data\n");
125 stat = despool_data(dcr, true /*commit*/);
127 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
128 close_data_spool_file(dcr);
131 return close_data_spool_file(dcr);
136 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
139 if (dcr->dev->device->spool_directory) {
140 dir = dcr->dev->device->spool_directory;
142 dir = working_directory;
144 Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job,
145 dcr->device->hdr.name);
149 static bool open_data_spool_file(DCR *dcr)
151 POOLMEM *name = get_pool_memory(PM_MESSAGE);
154 make_unique_data_spool_filename(dcr, &name);
155 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
156 dcr->spool_fd = spool_fd;
157 dcr->jcr->spool_attributes = true;
160 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
162 free_pool_memory(name);
165 Dmsg1(100, "Created spool file: %s\n", name);
166 free_pool_memory(name);
170 static bool close_data_spool_file(DCR *dcr)
172 POOLMEM *name = get_pool_memory(PM_MESSAGE);
175 spool_stats.data_jobs--;
176 spool_stats.total_data_jobs++;
177 if (spool_stats.data_size < dcr->job_spool_size) {
178 spool_stats.data_size = 0;
180 spool_stats.data_size -= dcr->job_spool_size;
182 dcr->job_spool_size = 0;
185 make_unique_data_spool_filename(dcr, &name);
186 close(dcr->spool_fd);
188 dcr->spooling = false;
190 Dmsg1(100, "Deleted spool file: %s\n", name);
191 free_pool_memory(name);
195 static const char *spool_name = "*spool*";
197 static bool despool_data(DCR *dcr, bool commit)
207 Dmsg0(100, "Despooling data\n");
208 /* Commit means that the job is done, so we commit, otherwise, we
209 * are despooling because of user spool size max or some error
210 * (e.g. filesystem full).
213 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
214 jcr->dcr->VolumeName,
215 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
217 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
218 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
220 dcr->despool_wait = true;
221 dcr->spooling = false;
222 lock_device(dcr->dev);
223 dcr->despool_wait = false;
224 dcr->despooling = true;
225 dcr->dev_locked = true;
228 * This is really quite kludgy and should be fixed some time.
229 * We create a dev structure to read from the spool file
232 rdev = (DEVICE *)malloc(sizeof(DEVICE));
233 memset(rdev, 0, sizeof(DEVICE));
234 rdev->dev_name = get_memory(strlen(spool_name)+1);
235 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
236 rdev->errmsg = get_pool_memory(PM_EMSG);
238 rdev->max_block_size = dcr->dev->max_block_size;
239 rdev->min_block_size = dcr->dev->min_block_size;
240 rdev->device = dcr->dev->device;
241 rdcr = new_dcr(NULL, rdev);
242 rdcr->spool_fd = dcr->spool_fd;
243 rdcr->jcr = jcr; /* set a valid jcr */
244 block = dcr->block; /* save block */
245 dcr->block = rdcr->block; /* make read and write block the same */
247 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
248 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
250 /* Add run time, to get current wait time */
251 time_t despool_start = time(NULL) - jcr->run_time;
254 if (job_canceled(jcr)) {
258 stat = read_block_from_spool_file(rdcr);
259 if (stat == RB_EOT) {
261 } else if (stat == RB_ERROR) {
265 ok = write_block_to_device(dcr);
267 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
268 dcr->dev->print_name(), dcr->dev->bstrerror());
270 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
273 /* Subtracting run_time give us elapsed time - wait_time since we started despooling */
274 time_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
276 if (despool_elapsed <= 0) {
280 Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
281 despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
282 edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
284 dcr->block = block; /* reset block */
286 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
287 if (ftruncate(rdcr->spool_fd, 0) != 0) {
289 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
291 Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
296 if (spool_stats.data_size < dcr->job_spool_size) {
297 spool_stats.data_size = 0;
299 spool_stats.data_size -= dcr->job_spool_size;
302 P(dcr->dev->spool_mutex);
303 dcr->dev->spool_size -= dcr->job_spool_size;
304 dcr->job_spool_size = 0; /* zap size in input dcr */
305 V(dcr->dev->spool_mutex);
306 free_memory(rdev->dev_name);
307 free_pool_memory(rdev->errmsg);
308 /* Be careful to NULL the jcr and free rdev after free_dcr() */
312 dcr->dev_locked = false;
313 dcr->spooling = true; /* turn on spooling again */
314 dcr->despooling = false;
315 unlock_device(dcr->dev);
320 * Read a block from the spool file
322 * Returns RB_OK on success
323 * RB_EOT when file done
326 static int read_block_from_spool_file(DCR *dcr)
331 DEV_BLOCK *block = dcr->block;
334 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
336 Dmsg0(100, "EOT on spool read.\n");
338 } else if (stat != (ssize_t)rlen) {
341 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
344 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
345 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
350 if (rlen > block->buf_len) {
351 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
352 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
355 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
356 if (stat != (ssize_t)rlen) {
357 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
358 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
361 /* Setup write pointers */
362 block->binbuf = rlen;
363 block->bufp = block->buf + block->binbuf;
364 block->FirstIndex = hdr.FirstIndex;
365 block->LastIndex = hdr.LastIndex;
366 block->VolSessionId = dcr->jcr->VolSessionId;
367 block->VolSessionTime = dcr->jcr->VolSessionTime;
368 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
373 * Write a block to the spool file
375 * Returns: true on success or EOT
376 * false on hard error
378 bool write_block_to_spool_file(DCR *dcr)
380 uint32_t wlen, hlen; /* length to write */
381 bool despool = false;
382 DEV_BLOCK *block = dcr->block;
384 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
385 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
389 hlen = sizeof(spool_hdr);
390 wlen = block->binbuf;
391 P(dcr->dev->spool_mutex);
392 dcr->job_spool_size += hlen + wlen;
393 dcr->dev->spool_size += hlen + wlen;
394 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
395 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
398 V(dcr->dev->spool_mutex);
400 spool_stats.data_size += hlen + wlen;
401 if (spool_stats.data_size > spool_stats.max_data_size) {
402 spool_stats.max_data_size = spool_stats.data_size;
407 char ec1[30], ec2[30], ec3[30], ec4[30];
408 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
409 "max_job_size=%s job_size=%s\n",
410 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
411 edit_uint64_with_commas(dcr->job_spool_size, ec2),
412 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
413 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
415 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
416 if (!despool_data(dcr, false)) {
417 Pmsg0(000, _("Bad return from despool in write_block.\n"));
420 /* Despooling cleared these variables so reset them */
421 P(dcr->dev->spool_mutex);
422 dcr->job_spool_size += hlen + wlen;
423 dcr->dev->spool_size += hlen + wlen;
424 V(dcr->dev->spool_mutex);
425 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
429 if (!write_spool_header(dcr)) {
432 if (!write_spool_data(dcr)) {
436 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
441 static bool write_spool_header(DCR *dcr)
445 DEV_BLOCK *block = dcr->block;
447 hdr.FirstIndex = block->FirstIndex;
448 hdr.LastIndex = block->LastIndex;
449 hdr.len = block->binbuf;
452 for (int retry=0; retry<=1; retry++) {
453 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
456 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
459 if (stat != (ssize_t)sizeof(hdr)) {
460 /* If we wrote something, truncate it, then despool */
462 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
464 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
469 if (!despool_data(dcr, false)) {
470 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
473 continue; /* try again */
477 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
481 static bool write_spool_data(DCR *dcr)
484 DEV_BLOCK *block = dcr->block;
487 for (int retry=0; retry<=1; retry++) {
488 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
491 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
494 if (stat != (ssize_t)block->binbuf) {
496 * If we wrote something, truncate it and the header, then despool
499 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
500 - stat - sizeof(spool_hdr)) != 0) {
502 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
507 if (!despool_data(dcr, false)) {
508 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
511 if (!write_spool_header(dcr)) {
514 continue; /* try again */
518 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
524 bool are_attributes_spooled(JCR *jcr)
526 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
530 * Create spool file for attributes.
531 * This is done by "attaching" to the bsock, and when
532 * it is called, the output is written to a file.
533 * The actual spooling is turned on and off in
534 * append.c only during writing of the attributes.
536 bool begin_attribute_spool(JCR *jcr)
538 if (!jcr->no_attributes && jcr->spool_attributes) {
539 return open_attr_spool_file(jcr, jcr->dir_bsock);
544 bool discard_attribute_spool(JCR *jcr)
546 if (are_attributes_spooled(jcr)) {
547 return close_attr_spool_file(jcr, jcr->dir_bsock);
552 static void update_attr_spool_size(ssize_t size)
556 if ((spool_stats.attr_size - size) > 0) {
557 spool_stats.attr_size -= size;
559 spool_stats.attr_size = 0;
565 bool commit_attribute_spool(JCR *jcr)
570 if (are_attributes_spooled(jcr)) {
571 if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
573 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
577 size = ftello(jcr->dir_bsock->spool_fd);
580 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
585 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
586 spool_stats.max_attr_size = spool_stats.attr_size + size;
588 spool_stats.attr_size += size;
590 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
591 edit_uint64_with_commas(size, ec1));
592 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
593 return close_attr_spool_file(jcr, jcr->dir_bsock);
598 close_attr_spool_file(jcr, jcr->dir_bsock);
602 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
604 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
609 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
611 POOLMEM *name = get_pool_memory(PM_MESSAGE);
613 make_unique_spool_filename(jcr, &name, bs->fd);
614 bs->spool_fd = fopen(name, "w+b");
617 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
619 free_pool_memory(name);
623 spool_stats.attr_jobs++;
625 free_pool_memory(name);
629 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
636 name = get_pool_memory(PM_MESSAGE);
638 spool_stats.attr_jobs--;
639 spool_stats.total_attr_jobs++;
641 make_unique_spool_filename(jcr, &name, bs->fd);
642 fclose(bs->spool_fd);
644 free_pool_memory(name);