4 * Kern Sibbald, March 2004
9 Copyright (C) 2004-2005 Kern Sibbald
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License
13 version 2 as amended with additional clauses defined in the
14 file LICENSE in the main source directory.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 the file LICENSE for additional details.
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
37 struct spool_stats_t {
38 uint32_t data_jobs; /* current jobs spooling data */
40 uint32_t total_data_jobs; /* total jobs to have spooled data */
41 uint32_t total_attr_jobs;
42 int64_t max_data_size; /* max data size */
43 int64_t max_attr_size;
44 int64_t data_size; /* current data size (all jobs running) */
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
52 * Header for data spool record */
54 int32_t FirstIndex; /* FirstIndex for buffer */
55 int32_t LastIndex; /* LastIndex for buffer */
56 uint32_t len; /* length of next buffer */
65 void list_spool_stats(BSOCK *bs)
67 char ed1[30], ed2[30];
68 if (spool_stats.data_jobs || spool_stats.max_data_size) {
69 bnet_fsend(bs, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
70 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71 spool_stats.total_data_jobs,
72 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
74 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75 bnet_fsend(bs, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
76 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77 spool_stats.total_attr_jobs,
78 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
82 bool begin_data_spool(DCR *dcr)
85 if (dcr->jcr->spool_data) {
86 Dmsg0(100, "Turning on data spooling\n");
87 dcr->spool_data = true;
88 stat = open_data_spool_file(dcr);
91 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
93 spool_stats.data_jobs++;
100 bool discard_data_spool(DCR *dcr)
103 Dmsg0(100, "Data spooling discarded\n");
104 return close_data_spool_file(dcr);
109 bool commit_data_spool(DCR *dcr)
114 Dmsg0(100, "Committing spooled data\n");
115 stat = despool_data(dcr, true /*commit*/);
117 Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
118 close_data_spool_file(dcr);
121 return close_data_spool_file(dcr);
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
129 if (dcr->dev->device->spool_directory) {
130 dir = dcr->dev->device->spool_directory;
132 dir = working_directory;
134 Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job,
135 dcr->device->hdr.name);
139 static bool open_data_spool_file(DCR *dcr)
141 POOLMEM *name = get_pool_memory(PM_MESSAGE);
144 make_unique_data_spool_filename(dcr, &name);
145 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146 dcr->spool_fd = spool_fd;
147 dcr->jcr->spool_attributes = true;
150 Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
152 free_pool_memory(name);
155 Dmsg1(100, "Created spool file: %s\n", name);
156 free_pool_memory(name);
160 static bool close_data_spool_file(DCR *dcr)
162 POOLMEM *name = get_pool_memory(PM_MESSAGE);
165 spool_stats.data_jobs--;
166 spool_stats.total_data_jobs++;
167 if (spool_stats.data_size < dcr->job_spool_size) {
168 spool_stats.data_size = 0;
170 spool_stats.data_size -= dcr->job_spool_size;
172 dcr->job_spool_size = 0;
175 make_unique_data_spool_filename(dcr, &name);
176 close(dcr->spool_fd);
178 dcr->spooling = false;
180 Dmsg1(100, "Deleted spool file: %s\n", name);
181 free_pool_memory(name);
185 static const char *spool_name = "*spool*";
187 static bool despool_data(DCR *dcr, bool commit)
197 Dmsg0(100, "Despooling data\n");
199 Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume. Despooling %s bytes ...\n"),
200 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
203 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
204 edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
206 dcr->spooling = false;
207 lock_device(dcr->dev);
208 dcr->dev_locked = true;
211 * This is really quite kludgy and should be fixed some time.
212 * We create a dev structure to read from the spool file
215 rdev = (DEVICE *)malloc(sizeof(DEVICE));
216 memset(rdev, 0, sizeof(DEVICE));
217 rdev->dev_name = get_memory(strlen(spool_name)+1);
218 bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
219 rdev->errmsg = get_pool_memory(PM_EMSG);
221 rdev->max_block_size = dcr->dev->max_block_size;
222 rdev->min_block_size = dcr->dev->min_block_size;
223 rdev->device = dcr->dev->device;
224 rdcr = new_dcr(NULL, rdev);
225 rdcr->spool_fd = dcr->spool_fd;
226 rdcr->jcr = jcr; /* set a valid jcr */
227 block = dcr->block; /* save block */
228 dcr->block = rdcr->block; /* make read and write block the same */
230 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
231 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
234 if (job_canceled(jcr)) {
238 stat = read_block_from_spool_file(rdcr);
239 if (stat == RB_EOT) {
241 } else if (stat == RB_ERROR) {
245 ok = write_block_to_device(dcr);
247 Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
248 dcr->dev->print_name(), strerror_dev(dcr->dev));
250 Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
252 dcr->block = block; /* reset block */
254 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
255 if (ftruncate(rdcr->spool_fd, 0) != 0) {
257 Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
259 Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
264 if (spool_stats.data_size < dcr->job_spool_size) {
265 spool_stats.data_size = 0;
267 spool_stats.data_size -= dcr->job_spool_size;
270 P(dcr->dev->spool_mutex);
271 dcr->dev->spool_size -= dcr->job_spool_size;
272 dcr->job_spool_size = 0; /* zap size in input dcr */
273 V(dcr->dev->spool_mutex);
274 free_memory(rdev->dev_name);
275 free_pool_memory(rdev->errmsg);
276 /* Be careful to NULL the jcr and free rdev after free_dcr() */
280 unlock_device(dcr->dev);
281 dcr->dev_locked = false;
282 dcr->spooling = true; /* turn on spooling again */
287 * Read a block from the spool file
289 * Returns RB_OK on success
290 * RB_EOT when file done
293 static int read_block_from_spool_file(DCR *dcr)
298 DEV_BLOCK *block = dcr->block;
301 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
303 Dmsg0(100, "EOT on spool read.\n");
305 } else if (stat != (ssize_t)rlen) {
308 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
311 Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
312 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
317 if (rlen > block->buf_len) {
318 Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
319 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
322 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
323 if (stat != (ssize_t)rlen) {
324 Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
325 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
328 /* Setup write pointers */
329 block->binbuf = rlen;
330 block->bufp = block->buf + block->binbuf;
331 block->FirstIndex = hdr.FirstIndex;
332 block->LastIndex = hdr.LastIndex;
333 block->VolSessionId = dcr->jcr->VolSessionId;
334 block->VolSessionTime = dcr->jcr->VolSessionTime;
335 Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
340 * Write a block to the spool file
342 * Returns: true on success or EOT
343 * false on hard error
345 bool write_block_to_spool_file(DCR *dcr)
347 uint32_t wlen, hlen; /* length to write */
348 bool despool = false;
349 DEV_BLOCK *block = dcr->block;
351 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
352 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
356 hlen = sizeof(spool_hdr);
357 wlen = block->binbuf;
358 P(dcr->dev->spool_mutex);
359 dcr->job_spool_size += hlen + wlen;
360 dcr->dev->spool_size += hlen + wlen;
361 if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
362 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
365 V(dcr->dev->spool_mutex);
367 spool_stats.data_size += hlen + wlen;
368 if (spool_stats.data_size > spool_stats.max_data_size) {
369 spool_stats.max_data_size = spool_stats.data_size;
374 char ec1[30], ec2[30], ec3[30], ec4[30];
375 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
376 "max_job_size=%s job_size=%s\n",
377 edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
378 edit_uint64_with_commas(dcr->job_spool_size, ec2),
379 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
380 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
382 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
383 if (!despool_data(dcr, false)) {
384 Pmsg0(000, _("Bad return from despool in write_block.\n"));
387 /* Despooling cleared these variables so reset them */
388 P(dcr->dev->spool_mutex);
389 dcr->job_spool_size += hlen + wlen;
390 dcr->dev->spool_size += hlen + wlen;
391 V(dcr->dev->spool_mutex);
392 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
396 if (!write_spool_header(dcr)) {
399 if (!write_spool_data(dcr)) {
403 Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
408 static bool write_spool_header(DCR *dcr)
412 DEV_BLOCK *block = dcr->block;
414 hdr.FirstIndex = block->FirstIndex;
415 hdr.LastIndex = block->LastIndex;
416 hdr.len = block->binbuf;
419 for (int retry=0; retry<=1; retry++) {
420 stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
423 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
426 if (stat != (ssize_t)sizeof(hdr)) {
427 /* If we wrote something, truncate it, then despool */
429 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
431 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
436 if (!despool_data(dcr, false)) {
437 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
440 continue; /* try again */
444 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
448 static bool write_spool_data(DCR *dcr)
451 DEV_BLOCK *block = dcr->block;
454 for (int retry=0; retry<=1; retry++) {
455 stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
458 Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
461 if (stat != (ssize_t)block->binbuf) {
463 * If we wrote something, truncate it and the header, then despool
466 if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
467 - stat - sizeof(spool_hdr)) != 0) {
469 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
474 if (!despool_data(dcr, false)) {
475 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
478 if (!write_spool_header(dcr)) {
481 continue; /* try again */
485 Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
491 bool are_attributes_spooled(JCR *jcr)
493 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
497 * Create spool file for attributes.
498 * This is done by "attaching" to the bsock, and when
499 * it is called, the output is written to a file.
500 * The actual spooling is turned on and off in
501 * append.c only during writing of the attributes.
503 bool begin_attribute_spool(JCR *jcr)
505 if (!jcr->no_attributes && jcr->spool_attributes) {
506 return open_attr_spool_file(jcr, jcr->dir_bsock);
511 bool discard_attribute_spool(JCR *jcr)
513 if (are_attributes_spooled(jcr)) {
514 return close_attr_spool_file(jcr, jcr->dir_bsock);
519 static void update_attr_spool_size(ssize_t size)
523 if ((spool_stats.attr_size - size) > 0) {
524 spool_stats.attr_size -= size;
526 spool_stats.attr_size = 0;
532 bool commit_attribute_spool(JCR *jcr)
537 if (are_attributes_spooled(jcr)) {
538 if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
540 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
544 size = ftello(jcr->dir_bsock->spool_fd);
547 Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
552 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
553 spool_stats.max_attr_size = spool_stats.attr_size + size;
555 spool_stats.attr_size += size;
557 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
558 edit_uint64_with_commas(size, ec1));
559 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
560 return close_attr_spool_file(jcr, jcr->dir_bsock);
565 close_attr_spool_file(jcr, jcr->dir_bsock);
569 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
571 Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
576 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
578 POOLMEM *name = get_pool_memory(PM_MESSAGE);
580 make_unique_spool_filename(jcr, &name, bs->fd);
581 bs->spool_fd = fopen(name, "w+");
584 Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
586 free_pool_memory(name);
590 spool_stats.attr_jobs++;
592 free_pool_memory(name);
596 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
603 name = get_pool_memory(PM_MESSAGE);
605 spool_stats.attr_jobs--;
606 spool_stats.total_attr_jobs++;
608 make_unique_spool_filename(jcr, &name, bs->fd);
609 fclose(bs->spool_fd);
611 free_pool_memory(name);