4 * Kern Sibbald, March 2004
9 Copyright (C) 2000-2004 Kern Sibbald and John Walker
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License as
13 published by the Free Software Foundation; either version 2 of
14 the License, or (at your option) any later version.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public
22 License along with this program; if not, write to the Free
23 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
33 static bool open_data_spool_file(JCR *jcr);
34 static bool close_data_spool_file(JCR *jcr);
35 static bool despool_data(DCR *dcr, bool commit);
36 static int read_block_from_spool_file(DCR *dcr);
37 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
38 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
39 static bool write_spool_header(DCR *dcr);
40 static bool write_spool_data(DCR *dcr);
42 struct spool_stats_t {
43 uint32_t data_jobs; /* current jobs spooling data */
45 uint32_t total_data_jobs; /* total jobs to have spooled data */
46 uint32_t total_attr_jobs;
47 int64_t max_data_size; /* max data size */
48 int64_t max_attr_size;
49 int64_t data_size; /* current data size (all jobs running) */
53 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
54 spool_stats_t spool_stats;
57 * Header for data spool record */
59 int32_t FirstIndex; /* FirstIndex for buffer */
60 int32_t LastIndex; /* LastIndex for buffer */
61 uint32_t len; /* length of next buffer */
70 void list_spool_stats(BSOCK *bs)
72 char ed1[30], ed2[30];
73 if (spool_stats.data_jobs || spool_stats.max_data_size) {
74 bnet_fsend(bs, "Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n",
75 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
76 spool_stats.total_data_jobs,
77 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
79 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
80 bnet_fsend(bs, "Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n",
81 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
82 spool_stats.total_attr_jobs,
83 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
87 bool begin_data_spool(JCR *jcr)
90 if (jcr->spool_data) {
91 Dmsg0(100, "Turning on data spooling\n");
92 jcr->dcr->spool_data = true;
93 stat = open_data_spool_file(jcr);
95 jcr->dcr->spooling = true;
96 Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
98 spool_stats.data_jobs++;
105 bool discard_data_spool(JCR *jcr)
107 if (jcr->dcr->spooling) {
108 Dmsg0(100, "Data spooling discarded\n");
109 return close_data_spool_file(jcr);
114 bool commit_data_spool(JCR *jcr)
118 if (jcr->dcr->spooling) {
119 Dmsg0(100, "Committing spooled data\n");
120 stat = despool_data(jcr->dcr, true /*commit*/);
122 Pmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
123 close_data_spool_file(jcr);
126 return close_data_spool_file(jcr);
131 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
134 if (jcr->dcr->dev->device->spool_directory) {
135 dir = jcr->dcr->dev->device->spool_directory;
137 dir = working_directory;
139 Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, jcr->Job, jcr->device->hdr.name);
143 static bool open_data_spool_file(JCR *jcr)
145 POOLMEM *name = get_pool_memory(PM_MESSAGE);
148 make_unique_data_spool_filename(jcr, &name);
149 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
150 jcr->dcr->spool_fd = spool_fd;
151 jcr->spool_attributes = true;
154 Jmsg(jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
156 free_pool_memory(name);
159 Dmsg1(100, "Created spool file: %s\n", name);
160 free_pool_memory(name);
164 static bool close_data_spool_file(JCR *jcr)
166 POOLMEM *name = get_pool_memory(PM_MESSAGE);
169 spool_stats.data_jobs--;
170 spool_stats.total_data_jobs++;
171 if (spool_stats.data_size < jcr->dcr->spool_size) {
172 spool_stats.data_size = 0;
174 spool_stats.data_size -= jcr->dcr->spool_size;
176 jcr->dcr->spool_size = 0;
179 make_unique_data_spool_filename(jcr, &name);
180 close(jcr->dcr->spool_fd);
181 jcr->dcr->spool_fd = -1;
182 jcr->dcr->spooling = false;
184 Dmsg1(100, "Deleted spool file: %s\n", name);
185 free_pool_memory(name);
189 static const char *spool_name = "*spool*";
191 static bool despool_data(DCR *dcr, bool commit)
201 Dmsg0(100, "Despooling data\n");
202 Jmsg(jcr, M_INFO, 0, _("%s spooled data to Volume. Despooling %s bytes ...\n"),
203 commit?"Committing":"Writting",
204 edit_uint64_with_commas(jcr->dcr->dev->spool_size, ec1));
205 dcr->spooling = false;
206 lock_device(dcr->dev);
207 dcr->dev_locked = true;
210 * This is really quite kludgy and should be fixed some time.
211 * We create a dev structure to read from the spool file
214 rdev = (DEVICE *)malloc(sizeof(DEVICE));
215 memset(rdev, 0, sizeof(DEVICE));
216 rdev->dev_name = get_memory(strlen(spool_name)+1);
217 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
218 rdev->errmsg = get_pool_memory(PM_EMSG);
220 rdev->max_block_size = dcr->dev->max_block_size;
221 rdev->min_block_size = dcr->dev->min_block_size;
222 rdev->device = dcr->dev->device;
223 rdcr = new_dcr(NULL, rdev);
224 rdcr->spool_fd = dcr->spool_fd;
225 rdcr->jcr = jcr; /* set a valid jcr */
226 block = dcr->block; /* save block */
227 dcr->block = rdcr->block; /* make read and write block the same */
229 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
230 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
233 if (job_canceled(jcr)) {
237 stat = read_block_from_spool_file(rdcr);
238 if (stat == RB_EOT) {
240 } else if (stat == RB_ERROR) {
244 ok = write_block_to_device(dcr);
245 Dmsg3(100, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
247 dcr->block = block; /* reset block */
249 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
250 if (ftruncate(rdcr->spool_fd, 0) != 0) {
252 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
254 Pmsg1(000, "Bad return from ftruncate. ERR=%s\n", be.strerror());
259 if (spool_stats.data_size < dcr->spool_size) {
260 spool_stats.data_size = 0;
262 spool_stats.data_size -= dcr->spool_size;
265 P(dcr->dev->spool_mutex);
266 dcr->dev->spool_size -= dcr->spool_size;
267 dcr->spool_size = 0; /* zap size in input dcr */
268 V(dcr->dev->spool_mutex);
269 free_memory(rdev->dev_name);
270 free_pool_memory(rdev->errmsg);
271 /* Be careful to NULL the jcr and free rdev after free_dcr() */
275 unlock_device(dcr->dev);
276 dcr->dev_locked = false;
277 dcr->spooling = true; /* turn on spooling again */
282 * Read a block from the spool file
284 * Returns RB_OK on success
285 * RB_EOT when file done
288 static int read_block_from_spool_file(DCR *dcr)
293 DEV_BLOCK *block = dcr->block;
296 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
298 Dmsg0(100, "EOT on spool read.\n");
300 } else if (stat != (ssize_t)rlen) {
303 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
306 Pmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
307 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
312 if (rlen > block->buf_len) {
313 Pmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
314 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
317 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
318 if (stat != (ssize_t)rlen) {
319 Pmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
320 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
323 /* Setup write pointers */
324 block->binbuf = rlen;
325 block->bufp = block->buf + block->binbuf;
326 block->FirstIndex = hdr.FirstIndex;
327 block->LastIndex = hdr.LastIndex;
328 block->VolSessionId = dcr->jcr->VolSessionId;
329 block->VolSessionTime = dcr->jcr->VolSessionTime;
330 Dmsg2(100, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
335 * Write a block to the spool file
337 * Returns: true on success or EOT
338 * false on hard error
340 bool write_block_to_spool_file(DCR *dcr)
342 uint32_t wlen, hlen; /* length to write */
343 bool despool = false;
344 DEV_BLOCK *block = dcr->block;
346 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
347 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
351 hlen = sizeof(spool_hdr);
352 wlen = block->binbuf;
353 P(dcr->dev->spool_mutex);
354 dcr->spool_size += hlen + wlen;
355 dcr->dev->spool_size += hlen + wlen;
356 if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
357 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
360 V(dcr->dev->spool_mutex);
362 spool_stats.data_size += hlen + wlen;
363 if (spool_stats.data_size > spool_stats.max_data_size) {
364 spool_stats.max_data_size = spool_stats.data_size;
369 char ec1[30], ec2[30], ec3[30], ec4[30];
370 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
371 "max_job_size=%s job_size=%s\n",
372 edit_uint64_with_commas(dcr->max_spool_size, ec1),
373 edit_uint64_with_commas(dcr->spool_size, ec2),
374 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
375 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
377 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
378 if (!despool_data(dcr, false)) {
379 Pmsg0(000, "Bad return from despool in write_block.\n");
382 /* Despooling cleared these variables so reset them */
383 P(dcr->dev->spool_mutex);
384 dcr->spool_size += hlen + wlen;
385 dcr->dev->spool_size += hlen + wlen;
386 V(dcr->dev->spool_mutex);
387 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
391 if (!write_spool_header(dcr)) {
394 if (!write_spool_data(dcr)) {
398 Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
403 static bool write_spool_header(DCR *dcr)
407 DEV_BLOCK *block = dcr->block;
409 hdr.FirstIndex = block->FirstIndex;
410 hdr.LastIndex = block->LastIndex;
411 hdr.len = block->binbuf;
414 for (int retry=0; retry<=1; retry++) {
415 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
418 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
421 if (stat != (ssize_t)sizeof(hdr)) {
422 /* If we wrote something, truncate it, then despool */
424 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
426 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
431 if (!despool_data(dcr, false)) {
432 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
435 continue; /* try again */
439 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
443 static bool write_spool_data(DCR *dcr)
446 DEV_BLOCK *block = dcr->block;
449 for (int retry=0; retry<=1; retry++) {
450 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
453 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
456 if (stat != (ssize_t)block->binbuf) {
458 * If we wrote something, truncate it and the header, then despool
461 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
462 - stat - sizeof(spool_hdr)) != 0) {
464 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
469 if (!despool_data(dcr, false)) {
470 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
473 if (!write_spool_header(dcr)) {
476 continue; /* try again */
480 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
486 bool are_attributes_spooled(JCR *jcr)
488 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
492 * Create spool file for attributes.
493 * This is done by "attaching" to the bsock, and when
494 * it is called, the output is written to a file.
495 * The actual spooling is turned on and off in
496 * append.c only during writing of the attributes.
498 bool begin_attribute_spool(JCR *jcr)
500 if (!jcr->no_attributes && jcr->spool_attributes) {
501 return open_attr_spool_file(jcr, jcr->dir_bsock);
506 bool discard_attribute_spool(JCR *jcr)
508 if (are_attributes_spooled(jcr)) {
509 return close_attr_spool_file(jcr, jcr->dir_bsock);
514 static void update_attr_spool_size(ssize_t size)
518 if ((spool_stats.attr_size - size) > 0) {
519 spool_stats.attr_size -= size;
521 spool_stats.attr_size = 0;
527 bool commit_attribute_spool(JCR *jcr)
532 if (are_attributes_spooled(jcr)) {
533 if (fseek(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
535 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
538 size = ftell(jcr->dir_bsock->spool_fd);
541 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
542 spool_stats.max_attr_size = spool_stats.attr_size + size;
545 spool_stats.attr_size += size;
547 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
548 edit_uint64_with_commas(size, ec1));
549 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
550 return close_attr_spool_file(jcr, jcr->dir_bsock);
555 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
557 Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
562 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
564 POOLMEM *name = get_pool_memory(PM_MESSAGE);
566 make_unique_spool_filename(jcr, &name, bs->fd);
567 bs->spool_fd = fopen(mp_chr(name), "w+");
570 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
572 free_pool_memory(name);
576 spool_stats.attr_jobs++;
578 free_pool_memory(name);
582 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
589 name = get_pool_memory(PM_MESSAGE);
591 spool_stats.attr_jobs--;
592 spool_stats.total_attr_jobs++;
594 make_unique_spool_filename(jcr, &name, bs->fd);
595 fclose(bs->spool_fd);
596 unlink(mp_chr(name));
597 free_pool_memory(name);