4 * Kern Sibbald, March 2004
9 Copyright (C) 2004-2005 Kern Sibbald
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License
13 version 2 as amended with additional clauses defined in the
14 file LICENSE in the main source directory.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 the file LICENSE for additional details.
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
37 struct spool_stats_t {
38 uint32_t data_jobs; /* current jobs spooling data */
40 uint32_t total_data_jobs; /* total jobs to have spooled data */
41 uint32_t total_attr_jobs;
42 int64_t max_data_size; /* max data size */
43 int64_t max_attr_size;
44 int64_t data_size; /* current data size (all jobs running) */
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
52 * Header for data spool record */
54 int32_t FirstIndex; /* FirstIndex for buffer */
55 int32_t LastIndex; /* LastIndex for buffer */
56 uint32_t len; /* length of next buffer */
65 void list_spool_stats(BSOCK *bs)
67 char ed1[30], ed2[30];
68 if (spool_stats.data_jobs || spool_stats.max_data_size) {
69 bnet_fsend(bs, "Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n",
70 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71 spool_stats.total_data_jobs,
72 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
74 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75 bnet_fsend(bs, "Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n",
76 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77 spool_stats.total_attr_jobs,
78 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
82 bool begin_data_spool(DCR *dcr)
85 if (dcr->jcr->spool_data) {
86 Dmsg0(100, "Turning on data spooling\n");
87 dcr->spool_data = true;
88 stat = open_data_spool_file(dcr);
91 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
93 spool_stats.data_jobs++;
100 bool discard_data_spool(DCR *dcr)
103 Dmsg0(100, "Data spooling discarded\n");
104 return close_data_spool_file(dcr);
109 bool commit_data_spool(DCR *dcr)
114 Dmsg0(100, "Committing spooled data\n");
115 stat = despool_data(dcr, true /*commit*/);
117 Pmsg1(000, "Bad return from despool WroteVol=%d\n", dcr->WroteVol);
118 close_data_spool_file(dcr);
121 return close_data_spool_file(dcr);
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
129 if (dcr->dev->device->spool_directory) {
130 dir = dcr->dev->device->spool_directory;
132 dir = working_directory;
134 Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job,
135 dcr->device->hdr.name);
139 static bool open_data_spool_file(DCR *dcr)
141 POOLMEM *name = get_pool_memory(PM_MESSAGE);
144 make_unique_data_spool_filename(dcr, &name);
145 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146 dcr->spool_fd = spool_fd;
147 dcr->jcr->spool_attributes = true;
150 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
152 free_pool_memory(name);
155 Dmsg1(100, "Created spool file: %s\n", name);
156 free_pool_memory(name);
160 static bool close_data_spool_file(DCR *dcr)
162 POOLMEM *name = get_pool_memory(PM_MESSAGE);
165 spool_stats.data_jobs--;
166 spool_stats.total_data_jobs++;
167 if (spool_stats.data_size < dcr->job_spool_size) {
168 spool_stats.data_size = 0;
170 spool_stats.data_size -= dcr->job_spool_size;
172 dcr->job_spool_size = 0;
175 make_unique_data_spool_filename(dcr, &name);
176 close(dcr->spool_fd);
178 dcr->spooling = false;
180 Dmsg1(100, "Deleted spool file: %s\n", name);
181 free_pool_memory(name);
185 static const char *spool_name = "*spool*";
187 static bool despool_data(DCR *dcr, bool commit)
197 Dmsg0(100, "Despooling data\n");
198 Jmsg(jcr, M_INFO, 0, _("%s spooled data to Volume. Despooling %s bytes ...\n"),
199 commit?"Committing":"Writing",
200 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
201 dcr->spooling = false;
202 lock_device(dcr->dev);
203 dcr->dev_locked = true;
206 * This is really quite kludgy and should be fixed some time.
207 * We create a dev structure to read from the spool file
210 rdev = (DEVICE *)malloc(sizeof(DEVICE));
211 memset(rdev, 0, sizeof(DEVICE));
212 rdev->dev_name = get_memory(strlen(spool_name)+1);
213 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
214 rdev->errmsg = get_pool_memory(PM_EMSG);
216 rdev->max_block_size = dcr->dev->max_block_size;
217 rdev->min_block_size = dcr->dev->min_block_size;
218 rdev->device = dcr->dev->device;
219 rdcr = new_dcr(NULL, rdev);
220 rdcr->spool_fd = dcr->spool_fd;
221 rdcr->jcr = jcr; /* set a valid jcr */
222 block = dcr->block; /* save block */
223 dcr->block = rdcr->block; /* make read and write block the same */
225 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
226 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
229 if (job_canceled(jcr)) {
233 stat = read_block_from_spool_file(rdcr);
234 if (stat == RB_EOT) {
236 } else if (stat == RB_ERROR) {
240 ok = write_block_to_device(dcr);
242 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
243 dcr->dev->print_name(), strerror_dev(dcr->dev));
245 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
247 dcr->block = block; /* reset block */
249 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
250 if (ftruncate(rdcr->spool_fd, 0) != 0) {
252 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
254 Pmsg1(000, "Bad return from ftruncate. ERR=%s\n", be.strerror());
259 if (spool_stats.data_size < dcr->job_spool_size) {
260 spool_stats.data_size = 0;
262 spool_stats.data_size -= dcr->job_spool_size;
265 P(dcr->dev->spool_mutex);
266 dcr->dev->spool_size -= dcr->job_spool_size;
267 dcr->job_spool_size = 0; /* zap size in input dcr */
268 V(dcr->dev->spool_mutex);
269 free_memory(rdev->dev_name);
270 free_pool_memory(rdev->errmsg);
271 /* Be careful to NULL the jcr and free rdev after free_dcr() */
275 unlock_device(dcr->dev);
276 dcr->dev_locked = false;
277 dcr->spooling = true; /* turn on spooling again */
282 * Read a block from the spool file
284 * Returns RB_OK on success
285 * RB_EOT when file done
288 static int read_block_from_spool_file(DCR *dcr)
293 DEV_BLOCK *block = dcr->block;
296 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
298 Dmsg0(100, "EOT on spool read.\n");
300 } else if (stat != (ssize_t)rlen) {
303 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
306 Pmsg2(000, "Spool read error. Wanted %u bytes, got %d\n", rlen, stat);
307 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
312 if (rlen > block->buf_len) {
313 Pmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
314 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
317 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
318 if (stat != (ssize_t)rlen) {
319 Pmsg2(000, "Spool data read error. Wanted %u bytes, got %d\n", rlen, stat);
320 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
323 /* Setup write pointers */
324 block->binbuf = rlen;
325 block->bufp = block->buf + block->binbuf;
326 block->FirstIndex = hdr.FirstIndex;
327 block->LastIndex = hdr.LastIndex;
328 block->VolSessionId = dcr->jcr->VolSessionId;
329 block->VolSessionTime = dcr->jcr->VolSessionTime;
330 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
335 * Write a block to the spool file
337 * Returns: true on success or EOT
338 * false on hard error
340 bool write_block_to_spool_file(DCR *dcr)
342 uint32_t wlen, hlen; /* length to write */
343 bool despool = false;
344 DEV_BLOCK *block = dcr->block;
346 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
347 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
351 hlen = sizeof(spool_hdr);
352 wlen = block->binbuf;
353 P(dcr->dev->spool_mutex);
354 dcr->job_spool_size += hlen + wlen;
355 dcr->dev->spool_size += hlen + wlen;
356 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
357 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
360 V(dcr->dev->spool_mutex);
362 spool_stats.data_size += hlen + wlen;
363 if (spool_stats.data_size > spool_stats.max_data_size) {
364 spool_stats.max_data_size = spool_stats.data_size;
369 char ec1[30], ec2[30], ec3[30], ec4[30];
370 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
371 "max_job_size=%s job_size=%s\n",
372 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
373 edit_uint64_with_commas(dcr->job_spool_size, ec2),
374 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
375 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
377 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
378 if (!despool_data(dcr, false)) {
379 Pmsg0(000, "Bad return from despool in write_block.\n");
382 /* Despooling cleared these variables so reset them */
383 P(dcr->dev->spool_mutex);
384 dcr->job_spool_size += hlen + wlen;
385 dcr->dev->spool_size += hlen + wlen;
386 V(dcr->dev->spool_mutex);
387 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
391 if (!write_spool_header(dcr)) {
394 if (!write_spool_data(dcr)) {
398 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
403 static bool write_spool_header(DCR *dcr)
407 DEV_BLOCK *block = dcr->block;
409 hdr.FirstIndex = block->FirstIndex;
410 hdr.LastIndex = block->LastIndex;
411 hdr.len = block->binbuf;
414 for (int retry=0; retry<=1; retry++) {
415 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
418 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
421 if (stat != (ssize_t)sizeof(hdr)) {
422 /* If we wrote something, truncate it, then despool */
424 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
426 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
431 if (!despool_data(dcr, false)) {
432 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
435 continue; /* try again */
439 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
443 static bool write_spool_data(DCR *dcr)
446 DEV_BLOCK *block = dcr->block;
449 for (int retry=0; retry<=1; retry++) {
450 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
453 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
456 if (stat != (ssize_t)block->binbuf) {
458 * If we wrote something, truncate it and the header, then despool
461 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
462 - stat - sizeof(spool_hdr)) != 0) {
464 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
469 if (!despool_data(dcr, false)) {
470 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
473 if (!write_spool_header(dcr)) {
476 continue; /* try again */
480 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
486 bool are_attributes_spooled(JCR *jcr)
488 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
492 * Create spool file for attributes.
493 * This is done by "attaching" to the bsock, and when
494 * it is called, the output is written to a file.
495 * The actual spooling is turned on and off in
496 * append.c only during writing of the attributes.
498 bool begin_attribute_spool(JCR *jcr)
500 if (!jcr->no_attributes && jcr->spool_attributes) {
501 return open_attr_spool_file(jcr, jcr->dir_bsock);
506 bool discard_attribute_spool(JCR *jcr)
508 if (are_attributes_spooled(jcr)) {
509 return close_attr_spool_file(jcr, jcr->dir_bsock);
514 static void update_attr_spool_size(ssize_t size)
518 if ((spool_stats.attr_size - size) > 0) {
519 spool_stats.attr_size -= size;
521 spool_stats.attr_size = 0;
527 bool commit_attribute_spool(JCR *jcr)
532 if (are_attributes_spooled(jcr)) {
533 if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
535 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
539 size = ftello(jcr->dir_bsock->spool_fd);
542 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
547 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
548 spool_stats.max_attr_size = spool_stats.attr_size + size;
550 spool_stats.attr_size += size;
552 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
553 edit_uint64_with_commas(size, ec1));
554 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
555 return close_attr_spool_file(jcr, jcr->dir_bsock);
560 close_attr_spool_file(jcr, jcr->dir_bsock);
564 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
566 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
571 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
573 POOLMEM *name = get_pool_memory(PM_MESSAGE);
575 make_unique_spool_filename(jcr, &name, bs->fd);
576 bs->spool_fd = fopen(name, "w+");
579 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
581 free_pool_memory(name);
585 spool_stats.attr_jobs++;
587 free_pool_memory(name);
591 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
598 name = get_pool_memory(PM_MESSAGE);
600 spool_stats.attr_jobs--;
601 spool_stats.total_attr_jobs++;
603 make_unique_spool_filename(jcr, &name, bs->fd);
604 fclose(bs->spool_fd);
606 free_pool_memory(name);