4 * Kern Sibbald, March 2004
9 Copyright (C) 2004-2005 Kern Sibbald
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License
13 version 2 as amended with additional clauses defined in the
14 file LICENSE in the main source directory.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 the file LICENSE for additional details.
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
37 struct spool_stats_t {
38 uint32_t data_jobs; /* current jobs spooling data */
40 uint32_t total_data_jobs; /* total jobs to have spooled data */
41 uint32_t total_attr_jobs;
42 int64_t max_data_size; /* max data size */
43 int64_t max_attr_size;
44 int64_t data_size; /* current data size (all jobs running) */
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
52 * Header for data spool record */
54 int32_t FirstIndex; /* FirstIndex for buffer */
55 int32_t LastIndex; /* LastIndex for buffer */
56 uint32_t len; /* length of next buffer */
65 void list_spool_stats(BSOCK *bs)
67 char ed1[30], ed2[30];
68 if (spool_stats.data_jobs || spool_stats.max_data_size) {
69 bnet_fsend(bs, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
70 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71 spool_stats.total_data_jobs,
72 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
74 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75 bnet_fsend(bs, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
76 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77 spool_stats.total_attr_jobs,
78 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
82 bool begin_data_spool(DCR *dcr)
85 if (dcr->jcr->spool_data) {
86 Dmsg0(100, "Turning on data spooling\n");
87 dcr->spool_data = true;
88 stat = open_data_spool_file(dcr);
91 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
93 spool_stats.data_jobs++;
100 bool discard_data_spool(DCR *dcr)
103 Dmsg0(100, "Data spooling discarded\n");
104 return close_data_spool_file(dcr);
109 bool commit_data_spool(DCR *dcr)
114 Dmsg0(100, "Committing spooled data\n");
115 stat = despool_data(dcr, true /*commit*/);
117 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
118 close_data_spool_file(dcr);
121 return close_data_spool_file(dcr);
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
129 if (dcr->dev->device->spool_directory) {
130 dir = dcr->dev->device->spool_directory;
132 dir = working_directory;
134 Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job,
135 dcr->device->hdr.name);
139 static bool open_data_spool_file(DCR *dcr)
141 POOLMEM *name = get_pool_memory(PM_MESSAGE);
144 make_unique_data_spool_filename(dcr, &name);
145 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146 dcr->spool_fd = spool_fd;
147 dcr->jcr->spool_attributes = true;
150 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
152 free_pool_memory(name);
155 Dmsg1(100, "Created spool file: %s\n", name);
156 free_pool_memory(name);
160 static bool close_data_spool_file(DCR *dcr)
162 POOLMEM *name = get_pool_memory(PM_MESSAGE);
165 spool_stats.data_jobs--;
166 spool_stats.total_data_jobs++;
167 if (spool_stats.data_size < dcr->job_spool_size) {
168 spool_stats.data_size = 0;
170 spool_stats.data_size -= dcr->job_spool_size;
172 dcr->job_spool_size = 0;
175 make_unique_data_spool_filename(dcr, &name);
176 close(dcr->spool_fd);
178 dcr->spooling = false;
180 Dmsg1(100, "Deleted spool file: %s\n", name);
181 free_pool_memory(name);
185 static const char *spool_name = "*spool*";
187 static bool despool_data(DCR *dcr, bool commit)
197 Dmsg0(100, "Despooling data\n");
199 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
200 jcr->dcr->VolumeName,
201 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
204 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
205 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
207 dcr->spooling = false;
208 lock_device(dcr->dev);
209 dcr->dev_locked = true;
212 * This is really quite kludgy and should be fixed some time.
213 * We create a dev structure to read from the spool file
216 rdev = (DEVICE *)malloc(sizeof(DEVICE));
217 memset(rdev, 0, sizeof(DEVICE));
218 rdev->dev_name = get_memory(strlen(spool_name)+1);
219 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
220 rdev->errmsg = get_pool_memory(PM_EMSG);
222 rdev->max_block_size = dcr->dev->max_block_size;
223 rdev->min_block_size = dcr->dev->min_block_size;
224 rdev->device = dcr->dev->device;
225 rdcr = new_dcr(NULL, rdev);
226 rdcr->spool_fd = dcr->spool_fd;
227 rdcr->jcr = jcr; /* set a valid jcr */
228 block = dcr->block; /* save block */
229 dcr->block = rdcr->block; /* make read and write block the same */
231 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
232 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
235 if (job_canceled(jcr)) {
239 stat = read_block_from_spool_file(rdcr);
240 if (stat == RB_EOT) {
242 } else if (stat == RB_ERROR) {
246 ok = write_block_to_device(dcr);
248 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
249 dcr->dev->print_name(), strerror_dev(dcr->dev));
251 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
253 dcr->block = block; /* reset block */
255 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
256 if (ftruncate(rdcr->spool_fd, 0) != 0) {
258 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
260 Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
265 if (spool_stats.data_size < dcr->job_spool_size) {
266 spool_stats.data_size = 0;
268 spool_stats.data_size -= dcr->job_spool_size;
271 P(dcr->dev->spool_mutex);
272 dcr->dev->spool_size -= dcr->job_spool_size;
273 dcr->job_spool_size = 0; /* zap size in input dcr */
274 V(dcr->dev->spool_mutex);
275 free_memory(rdev->dev_name);
276 free_pool_memory(rdev->errmsg);
277 /* Be careful to NULL the jcr and free rdev after free_dcr() */
281 unlock_device(dcr->dev);
282 dcr->dev_locked = false;
283 dcr->spooling = true; /* turn on spooling again */
288 * Read a block from the spool file
290 * Returns RB_OK on success
291 * RB_EOT when file done
294 static int read_block_from_spool_file(DCR *dcr)
299 DEV_BLOCK *block = dcr->block;
302 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
304 Dmsg0(100, "EOT on spool read.\n");
306 } else if (stat != (ssize_t)rlen) {
309 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
312 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
313 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
318 if (rlen > block->buf_len) {
319 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
320 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
323 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
324 if (stat != (ssize_t)rlen) {
325 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
326 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
329 /* Setup write pointers */
330 block->binbuf = rlen;
331 block->bufp = block->buf + block->binbuf;
332 block->FirstIndex = hdr.FirstIndex;
333 block->LastIndex = hdr.LastIndex;
334 block->VolSessionId = dcr->jcr->VolSessionId;
335 block->VolSessionTime = dcr->jcr->VolSessionTime;
336 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
341 * Write a block to the spool file
343 * Returns: true on success or EOT
344 * false on hard error
346 bool write_block_to_spool_file(DCR *dcr)
348 uint32_t wlen, hlen; /* length to write */
349 bool despool = false;
350 DEV_BLOCK *block = dcr->block;
352 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
353 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
357 hlen = sizeof(spool_hdr);
358 wlen = block->binbuf;
359 P(dcr->dev->spool_mutex);
360 dcr->job_spool_size += hlen + wlen;
361 dcr->dev->spool_size += hlen + wlen;
362 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
363 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
366 V(dcr->dev->spool_mutex);
368 spool_stats.data_size += hlen + wlen;
369 if (spool_stats.data_size > spool_stats.max_data_size) {
370 spool_stats.max_data_size = spool_stats.data_size;
375 char ec1[30], ec2[30], ec3[30], ec4[30];
376 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
377 "max_job_size=%s job_size=%s\n",
378 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
379 edit_uint64_with_commas(dcr->job_spool_size, ec2),
380 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
381 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
383 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
384 if (!despool_data(dcr, false)) {
385 Pmsg0(000, _("Bad return from despool in write_block.\n"));
388 /* Despooling cleared these variables so reset them */
389 P(dcr->dev->spool_mutex);
390 dcr->job_spool_size += hlen + wlen;
391 dcr->dev->spool_size += hlen + wlen;
392 V(dcr->dev->spool_mutex);
393 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
397 if (!write_spool_header(dcr)) {
400 if (!write_spool_data(dcr)) {
404 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
409 static bool write_spool_header(DCR *dcr)
413 DEV_BLOCK *block = dcr->block;
415 hdr.FirstIndex = block->FirstIndex;
416 hdr.LastIndex = block->LastIndex;
417 hdr.len = block->binbuf;
420 for (int retry=0; retry<=1; retry++) {
421 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
424 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
427 if (stat != (ssize_t)sizeof(hdr)) {
428 /* If we wrote something, truncate it, then despool */
430 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
432 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
437 if (!despool_data(dcr, false)) {
438 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
441 continue; /* try again */
445 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
449 static bool write_spool_data(DCR *dcr)
452 DEV_BLOCK *block = dcr->block;
455 for (int retry=0; retry<=1; retry++) {
456 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
459 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
462 if (stat != (ssize_t)block->binbuf) {
464 * If we wrote something, truncate it and the header, then despool
467 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
468 - stat - sizeof(spool_hdr)) != 0) {
470 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
475 if (!despool_data(dcr, false)) {
476 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
479 if (!write_spool_header(dcr)) {
482 continue; /* try again */
486 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
492 bool are_attributes_spooled(JCR *jcr)
494 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
498 * Create spool file for attributes.
499 * This is done by "attaching" to the bsock, and when
500 * it is called, the output is written to a file.
501 * The actual spooling is turned on and off in
502 * append.c only during writing of the attributes.
504 bool begin_attribute_spool(JCR *jcr)
506 if (!jcr->no_attributes && jcr->spool_attributes) {
507 return open_attr_spool_file(jcr, jcr->dir_bsock);
512 bool discard_attribute_spool(JCR *jcr)
514 if (are_attributes_spooled(jcr)) {
515 return close_attr_spool_file(jcr, jcr->dir_bsock);
520 static void update_attr_spool_size(ssize_t size)
524 if ((spool_stats.attr_size - size) > 0) {
525 spool_stats.attr_size -= size;
527 spool_stats.attr_size = 0;
533 bool commit_attribute_spool(JCR *jcr)
538 if (are_attributes_spooled(jcr)) {
539 if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
541 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
545 size = ftello(jcr->dir_bsock->spool_fd);
548 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
553 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
554 spool_stats.max_attr_size = spool_stats.attr_size + size;
556 spool_stats.attr_size += size;
558 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
559 edit_uint64_with_commas(size, ec1));
560 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
561 return close_attr_spool_file(jcr, jcr->dir_bsock);
566 close_attr_spool_file(jcr, jcr->dir_bsock);
570 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
572 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
577 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
579 POOLMEM *name = get_pool_memory(PM_MESSAGE);
581 make_unique_spool_filename(jcr, &name, bs->fd);
582 bs->spool_fd = fopen(name, "w+");
585 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
587 free_pool_memory(name);
591 spool_stats.attr_jobs++;
593 free_pool_memory(name);
597 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
604 name = get_pool_memory(PM_MESSAGE);
606 spool_stats.attr_jobs--;
607 spool_stats.total_attr_jobs++;
609 make_unique_spool_filename(jcr, &name, bs->fd);
610 fclose(bs->spool_fd);
612 free_pool_memory(name);