4 * Kern Sibbald, March 2004
9 Copyright (C) 2004-2006 Kern Sibbald
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License
13 version 2 as amended with additional clauses defined in the
14 file LICENSE in the main source directory.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 the file LICENSE for additional details.
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
37 struct spool_stats_t {
38 uint32_t data_jobs; /* current jobs spooling data */
40 uint32_t total_data_jobs; /* total jobs to have spooled data */
41 uint32_t total_attr_jobs;
42 int64_t max_data_size; /* max data size */
43 int64_t max_attr_size;
44 int64_t data_size; /* current data size (all jobs running) */
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
52 * Header for data spool record */
54 int32_t FirstIndex; /* FirstIndex for buffer */
55 int32_t LastIndex; /* LastIndex for buffer */
56 uint32_t len; /* length of next buffer */
65 void list_spool_stats(BSOCK *bs)
67 char ed1[30], ed2[30];
68 if (spool_stats.data_jobs || spool_stats.max_data_size) {
69 bnet_fsend(bs, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
70 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71 spool_stats.total_data_jobs,
72 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
74 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75 bnet_fsend(bs, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
76 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77 spool_stats.total_attr_jobs,
78 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
82 bool begin_data_spool(DCR *dcr)
85 if (dcr->jcr->spool_data) {
86 Dmsg0(100, "Turning on data spooling\n");
87 dcr->spool_data = true;
88 stat = open_data_spool_file(dcr);
91 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
93 spool_stats.data_jobs++;
100 bool discard_data_spool(DCR *dcr)
103 Dmsg0(100, "Data spooling discarded\n");
104 return close_data_spool_file(dcr);
109 bool commit_data_spool(DCR *dcr)
114 Dmsg0(100, "Committing spooled data\n");
115 stat = despool_data(dcr, true /*commit*/);
117 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
118 close_data_spool_file(dcr);
121 return close_data_spool_file(dcr);
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
129 if (dcr->dev->device->spool_directory) {
130 dir = dcr->dev->device->spool_directory;
132 dir = working_directory;
134 Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job,
135 dcr->device->hdr.name);
139 static bool open_data_spool_file(DCR *dcr)
141 POOLMEM *name = get_pool_memory(PM_MESSAGE);
144 make_unique_data_spool_filename(dcr, &name);
145 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146 dcr->spool_fd = spool_fd;
147 dcr->jcr->spool_attributes = true;
150 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
152 free_pool_memory(name);
155 Dmsg1(100, "Created spool file: %s\n", name);
156 free_pool_memory(name);
160 static bool close_data_spool_file(DCR *dcr)
162 POOLMEM *name = get_pool_memory(PM_MESSAGE);
165 spool_stats.data_jobs--;
166 spool_stats.total_data_jobs++;
167 if (spool_stats.data_size < dcr->job_spool_size) {
168 spool_stats.data_size = 0;
170 spool_stats.data_size -= dcr->job_spool_size;
172 dcr->job_spool_size = 0;
175 make_unique_data_spool_filename(dcr, &name);
176 close(dcr->spool_fd);
178 dcr->spooling = false;
180 Dmsg1(100, "Deleted spool file: %s\n", name);
181 free_pool_memory(name);
185 static const char *spool_name = "*spool*";
187 static bool despool_data(DCR *dcr, bool commit)
197 Dmsg0(100, "Despooling data\n");
198 /* Commit means that the job is done, so we commit, otherwise, we
199 * are despooling because of user spool size max or some error
200 * (e.g. filesystem full).
203 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
204 jcr->dcr->VolumeName,
205 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
207 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
208 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
210 dcr->spooling = false;
211 dcr->despooling = true;
212 lock_device(dcr->dev);
213 dcr->dev_locked = true;
216 * This is really quite kludgy and should be fixed some time.
217 * We create a dev structure to read from the spool file
220 rdev = (DEVICE *)malloc(sizeof(DEVICE));
221 memset(rdev, 0, sizeof(DEVICE));
222 rdev->dev_name = get_memory(strlen(spool_name)+1);
223 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
224 rdev->errmsg = get_pool_memory(PM_EMSG);
226 rdev->max_block_size = dcr->dev->max_block_size;
227 rdev->min_block_size = dcr->dev->min_block_size;
228 rdev->device = dcr->dev->device;
229 rdcr = new_dcr(NULL, rdev);
230 rdcr->spool_fd = dcr->spool_fd;
231 rdcr->jcr = jcr; /* set a valid jcr */
232 block = dcr->block; /* save block */
233 dcr->block = rdcr->block; /* make read and write block the same */
235 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
236 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
239 if (job_canceled(jcr)) {
243 stat = read_block_from_spool_file(rdcr);
244 if (stat == RB_EOT) {
246 } else if (stat == RB_ERROR) {
250 ok = write_block_to_device(dcr);
252 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
253 dcr->dev->print_name(), dcr->dev->bstrerror());
255 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
257 dcr->block = block; /* reset block */
259 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
260 if (ftruncate(rdcr->spool_fd, 0) != 0) {
262 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
264 Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
269 if (spool_stats.data_size < dcr->job_spool_size) {
270 spool_stats.data_size = 0;
272 spool_stats.data_size -= dcr->job_spool_size;
275 P(dcr->dev->spool_mutex);
276 dcr->dev->spool_size -= dcr->job_spool_size;
277 dcr->job_spool_size = 0; /* zap size in input dcr */
278 V(dcr->dev->spool_mutex);
279 free_memory(rdev->dev_name);
280 free_pool_memory(rdev->errmsg);
281 /* Be careful to NULL the jcr and free rdev after free_dcr() */
285 unlock_device(dcr->dev);
286 dcr->dev_locked = false;
287 dcr->spooling = true; /* turn on spooling again */
288 dcr->despooling = false;
293 * Read a block from the spool file
295 * Returns RB_OK on success
296 * RB_EOT when file done
299 static int read_block_from_spool_file(DCR *dcr)
304 DEV_BLOCK *block = dcr->block;
307 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
309 Dmsg0(100, "EOT on spool read.\n");
311 } else if (stat != (ssize_t)rlen) {
314 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
317 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
318 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
323 if (rlen > block->buf_len) {
324 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
325 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
328 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
329 if (stat != (ssize_t)rlen) {
330 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
331 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
334 /* Setup write pointers */
335 block->binbuf = rlen;
336 block->bufp = block->buf + block->binbuf;
337 block->FirstIndex = hdr.FirstIndex;
338 block->LastIndex = hdr.LastIndex;
339 block->VolSessionId = dcr->jcr->VolSessionId;
340 block->VolSessionTime = dcr->jcr->VolSessionTime;
341 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
346 * Write a block to the spool file
348 * Returns: true on success or EOT
349 * false on hard error
351 bool write_block_to_spool_file(DCR *dcr)
353 uint32_t wlen, hlen; /* length to write */
354 bool despool = false;
355 DEV_BLOCK *block = dcr->block;
357 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
358 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
362 hlen = sizeof(spool_hdr);
363 wlen = block->binbuf;
364 P(dcr->dev->spool_mutex);
365 dcr->job_spool_size += hlen + wlen;
366 dcr->dev->spool_size += hlen + wlen;
367 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
368 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
371 V(dcr->dev->spool_mutex);
373 spool_stats.data_size += hlen + wlen;
374 if (spool_stats.data_size > spool_stats.max_data_size) {
375 spool_stats.max_data_size = spool_stats.data_size;
380 char ec1[30], ec2[30], ec3[30], ec4[30];
381 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
382 "max_job_size=%s job_size=%s\n",
383 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
384 edit_uint64_with_commas(dcr->job_spool_size, ec2),
385 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
386 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
388 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
389 if (!despool_data(dcr, false)) {
390 Pmsg0(000, _("Bad return from despool in write_block.\n"));
393 /* Despooling cleared these variables so reset them */
394 P(dcr->dev->spool_mutex);
395 dcr->job_spool_size += hlen + wlen;
396 dcr->dev->spool_size += hlen + wlen;
397 V(dcr->dev->spool_mutex);
398 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
402 if (!write_spool_header(dcr)) {
405 if (!write_spool_data(dcr)) {
409 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
414 static bool write_spool_header(DCR *dcr)
418 DEV_BLOCK *block = dcr->block;
420 hdr.FirstIndex = block->FirstIndex;
421 hdr.LastIndex = block->LastIndex;
422 hdr.len = block->binbuf;
425 for (int retry=0; retry<=1; retry++) {
426 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
429 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
432 if (stat != (ssize_t)sizeof(hdr)) {
433 /* If we wrote something, truncate it, then despool */
435 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
437 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
442 if (!despool_data(dcr, false)) {
443 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
446 continue; /* try again */
450 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
454 static bool write_spool_data(DCR *dcr)
457 DEV_BLOCK *block = dcr->block;
460 for (int retry=0; retry<=1; retry++) {
461 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
464 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
467 if (stat != (ssize_t)block->binbuf) {
469 * If we wrote something, truncate it and the header, then despool
472 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
473 - stat - sizeof(spool_hdr)) != 0) {
475 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
480 if (!despool_data(dcr, false)) {
481 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
484 if (!write_spool_header(dcr)) {
487 continue; /* try again */
491 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
497 bool are_attributes_spooled(JCR *jcr)
499 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
503 * Create spool file for attributes.
504 * This is done by "attaching" to the bsock, and when
505 * it is called, the output is written to a file.
506 * The actual spooling is turned on and off in
507 * append.c only during writing of the attributes.
509 bool begin_attribute_spool(JCR *jcr)
511 if (!jcr->no_attributes && jcr->spool_attributes) {
512 return open_attr_spool_file(jcr, jcr->dir_bsock);
517 bool discard_attribute_spool(JCR *jcr)
519 if (are_attributes_spooled(jcr)) {
520 return close_attr_spool_file(jcr, jcr->dir_bsock);
525 static void update_attr_spool_size(ssize_t size)
529 if ((spool_stats.attr_size - size) > 0) {
530 spool_stats.attr_size -= size;
532 spool_stats.attr_size = 0;
538 bool commit_attribute_spool(JCR *jcr)
543 if (are_attributes_spooled(jcr)) {
544 if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
546 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
550 size = ftello(jcr->dir_bsock->spool_fd);
553 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
558 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
559 spool_stats.max_attr_size = spool_stats.attr_size + size;
561 spool_stats.attr_size += size;
563 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
564 edit_uint64_with_commas(size, ec1));
565 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
566 return close_attr_spool_file(jcr, jcr->dir_bsock);
571 close_attr_spool_file(jcr, jcr->dir_bsock);
575 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
577 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
582 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
584 POOLMEM *name = get_pool_memory(PM_MESSAGE);
586 make_unique_spool_filename(jcr, &name, bs->fd);
587 bs->spool_fd = fopen(name, "w+b");
590 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
592 free_pool_memory(name);
596 spool_stats.attr_jobs++;
598 free_pool_memory(name);
602 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
609 name = get_pool_memory(PM_MESSAGE);
611 spool_stats.attr_jobs--;
612 spool_stats.total_attr_jobs++;
614 make_unique_spool_filename(jcr, &name, bs->fd);
615 fclose(bs->spool_fd);
617 free_pool_memory(name);