]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
Backport from BEE
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2014 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from many
7    others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    Bacula® is a registered trademark of Kern Sibbald.
15 */
16 /*
17  *  Spooling code
18  *
19  *      Kern Sibbald, March 2004
20  *
21  */
22
23 #include "bacula.h"
24 #include "stored.h"
25
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int  read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
36
37 struct spool_stats_t {
38    uint32_t data_jobs;                /* current jobs spooling data */
39    uint32_t attr_jobs;
40    uint32_t total_data_jobs;          /* total jobs to have spooled data */
41    uint32_t total_attr_jobs;
42    int64_t max_data_size;             /* max data size */
43    int64_t max_attr_size;
44    int64_t data_size;                 /* current data size (all jobs running) */
45    int64_t attr_size;
46 };
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
50
51 /*
52  * Header for data spool record */
53 struct spool_hdr {
54    int32_t  FirstIndex;               /* FirstIndex for buffer */
55    int32_t  LastIndex;                /* LastIndex for buffer */
56    uint32_t len;                      /* length of next buffer */
57 };
58
59 enum {
60    RB_EOT = 1,
61    RB_ERROR,
62    RB_OK
63 };
64
65 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
66 {
67    char ed1[30], ed2[30];
68    POOL_MEM msg(PM_MESSAGE);
69    int len;
70
71    len = Mmsg(msg, _("Spooling statistics:\n"));
72
73    if (spool_stats.data_jobs || spool_stats.max_data_size) {
74       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
75          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
76          spool_stats.total_data_jobs,
77          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
78
79       sendit(msg.c_str(), len, arg);
80    }
81    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
82       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
83          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
84          spool_stats.total_attr_jobs,
85          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
86
87       sendit(msg.c_str(), len, arg);
88    }
89 }
90
91 bool begin_data_spool(DCR *dcr)
92 {
93    bool stat = true;
94    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
95       Dmsg0(100, "Turning on data spooling\n");
96       dcr->spool_data = true;
97       stat = open_data_spool_file(dcr);
98       if (stat) {
99          dcr->spooling = true;
100          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
101          P(mutex);
102          spool_stats.data_jobs++;
103          V(mutex);
104       }
105    }
106    return stat;
107 }
108
109 bool discard_data_spool(DCR *dcr)
110 {
111    if (dcr->spooling) {
112       Dmsg0(100, "Data spooling discarded\n");
113       return close_data_spool_file(dcr);
114    }
115    return true;
116 }
117
118 bool commit_data_spool(DCR *dcr)
119 {
120    bool stat;
121
122    if (dcr->spooling) {
123       Dmsg0(100, "Committing spooled data\n");
124       stat = despool_data(dcr, true /*commit*/);
125       if (!stat) {
126          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
127          close_data_spool_file(dcr);
128          return false;
129       }
130       return close_data_spool_file(dcr);
131    }
132    return true;
133 }
134
135 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
136 {
137    const char *dir;
138    if (dcr->dev->device->spool_directory) {
139       dir = dcr->dev->device->spool_directory;
140    } else {
141       dir = working_directory;
142    }
143    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
144         dcr->jcr->Job, dcr->device->hdr.name);
145 }
146
147
148 static bool open_data_spool_file(DCR *dcr)
149 {
150    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
151    int spool_fd;
152
153    make_unique_data_spool_filename(dcr, &name);
154    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
155       dcr->spool_fd = spool_fd;
156       dcr->jcr->spool_attributes = true;
157    } else {
158       berrno be;
159       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
160            be.bstrerror());
161       free_pool_memory(name);
162       return false;
163    }
164    Dmsg1(100, "Created spool file: %s\n", name);
165    free_pool_memory(name);
166    return true;
167 }
168
169 static const char *spool_name = "*spool*";
170
171 /*
172  * NB! This routine locks the device, but if committing will
173  *     not unlock it. If not committing, it will be unlocked.
174  */
175 static bool despool_data(DCR *dcr, bool commit)
176 {
177    DEVICE *rdev;
178    DCR *rdcr;
179    bool ok = true;
180    DEV_BLOCK *block;
181    JCR *jcr = dcr->jcr;
182    int stat;
183    char ec1[50];
184
185    Dmsg0(100, "Despooling data\n");
186    if (jcr->dcr->job_spool_size == 0) {
187       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
188    }
189
190    /*
191     * Commit means that the job is done, so we commit, otherwise, we
192     *  are despooling because of user spool size max or some error
193     *  (e.g. filesystem full).
194     */
195    if (commit) {
196       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
197          jcr->dcr->VolumeName,
198          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
199       jcr->setJobStatus(JS_DataCommitting);
200    } else {
201       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
202          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
203       jcr->setJobStatus(JS_DataDespooling);
204    }
205    jcr->sendJobStatus(JS_DataDespooling);
206    dcr->despool_wait = true;
207    dcr->spooling = false;
208    /*
209     * We work with device blocked, but not locked so that
210     *  other threads -- e.g. reservations can lock the device
211     *  structure.
212     */
213    dcr->dblock(BST_DESPOOLING);
214    dcr->despool_wait = false;
215    dcr->despooling = true;
216
217    /*
218     * This is really quite kludgy and should be fixed some time.
219     * We create a dev structure to read from the spool file
220     * in rdev and rdcr.
221     */
222    rdev = (DEVICE *)malloc(sizeof(DEVICE));
223    memset(rdev, 0, sizeof(DEVICE));
224    rdev->dev_name = get_memory(strlen(spool_name)+1);
225    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
226    rdev->errmsg = get_pool_memory(PM_EMSG);
227    *rdev->errmsg = 0;
228    rdev->max_block_size = dcr->dev->max_block_size;
229    rdev->min_block_size = dcr->dev->min_block_size;
230    rdev->device = dcr->dev->device;
231    rdcr = new_dcr(jcr, NULL, rdev, SD_READ);
232    rdcr->spool_fd = dcr->spool_fd;
233    block = dcr->block;                /* save block */
234    dcr->block = rdcr->block;          /* make read and write block the same */
235
236    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
237    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
238
239 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
240    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
241 #endif
242
243    /* Add run time, to get current wait time */
244    int32_t despool_start = time(NULL) - jcr->run_time;
245
246    set_new_file_parameters(dcr);
247
248    for ( ; ok; ) {
249       if (job_canceled(jcr)) {
250          ok = false;
251          break;
252       }
253       stat = read_block_from_spool_file(rdcr);
254       if (stat == RB_EOT) {
255          break;
256       } else if (stat == RB_ERROR) {
257          ok = false;
258          break;
259       }
260       ok = dcr->write_block_to_device();
261       if (!ok) {
262          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
263                dcr->dev->print_name(), dcr->dev->bstrerror());
264          Pmsg2(000, "Fatal append error on device %s: ERR=%s\n",
265                dcr->dev->print_name(), dcr->dev->bstrerror());
266          jcr->forceJobStatus(JS_FatalError);
267       }
268       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
269    }
270
271    if (!dir_create_jobmedia_record(dcr)) {
272       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
273          dcr->getVolCatName(), jcr->Job);
274       jcr->forceJobStatus(JS_FatalError);
275    }
276    /* Set new file/block parameters for current dcr */
277    set_new_file_parameters(dcr);
278
279    /*
280     * Subtracting run_time give us elapsed time - wait_time since
281     * we started despooling. Note, don't use time_t as it is 32 or 64
282     * bits depending on the OS and doesn't edit with %d
283     */
284    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
285
286    if (despool_elapsed <= 0) {
287       despool_elapsed = 1;
288    }
289
290    Jmsg(jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
291          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
292          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
293
294    dcr->block = block;                /* reset block */
295
296    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
297    if (ftruncate(rdcr->spool_fd, 0) != 0) {
298       berrno be;
299       Jmsg(jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
300          be.bstrerror());
301       /* Note, try continuing despite ftruncate problem */
302    }
303
304    P(mutex);
305    if (spool_stats.data_size < dcr->job_spool_size) {
306       spool_stats.data_size = 0;
307    } else {
308       spool_stats.data_size -= dcr->job_spool_size;
309    }
310    V(mutex);
311    P(dcr->dev->spool_mutex);
312    dcr->dev->spool_size -= dcr->job_spool_size;
313    dcr->job_spool_size = 0;            /* zap size in input dcr */
314    V(dcr->dev->spool_mutex);
315    free_memory(rdev->dev_name);
316    free_pool_memory(rdev->errmsg);
317    /* Be careful to NULL the jcr and free rdev after free_dcr() */
318    rdcr->jcr = NULL;
319    rdcr->set_dev(NULL);
320    free_dcr(rdcr);
321    free(rdev);
322    dcr->spooling = true;           /* turn on spooling again */
323    dcr->despooling = false;
324    /*
325     * Note, if committing we leave the device blocked. It will be removed in
326     *  release_device();
327     */
328    if (!commit) {
329       dcr->dev->dunblock();
330    }
331    jcr->sendJobStatus(JS_Running);
332    return ok;
333 }
334
335 /*
336  * Read a block from the spool file
337  *
338  *  Returns RB_OK on success
339  *          RB_EOT when file done
340  *          RB_ERROR on error
341  */
342 static int read_block_from_spool_file(DCR *dcr)
343 {
344    uint32_t rlen;
345    ssize_t stat;
346    spool_hdr hdr;
347    DEV_BLOCK *block = dcr->block;
348    JCR *jcr = dcr->jcr;
349
350    rlen = sizeof(hdr);
351    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
352    if (stat == 0) {
353       Dmsg0(100, "EOT on spool read.\n");
354       return RB_EOT;
355    } else if (stat != (ssize_t)rlen) {
356       if (stat == -1) {
357          berrno be;
358          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
359               be.bstrerror());
360       } else {
361          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
362          Jmsg2(jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
363       }
364       jcr->forceJobStatus(JS_FatalError);
365       return RB_ERROR;
366    }
367    rlen = hdr.len;
368    if (rlen > block->buf_len) {
369       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
370       Jmsg2(jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
371       jcr->forceJobStatus(JS_FatalError);
372       return RB_ERROR;
373    }
374    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
375    if (stat != (ssize_t)rlen) {
376       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
377       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
378       jcr->forceJobStatus(JS_FatalError);
379       return RB_ERROR;
380    }
381    /* Setup write pointers */
382    block->binbuf = rlen;
383    block->bufp = block->buf + block->binbuf;
384    block->FirstIndex = hdr.FirstIndex;
385    block->LastIndex = hdr.LastIndex;
386    block->VolSessionId = dcr->jcr->VolSessionId;
387    block->VolSessionTime = dcr->jcr->VolSessionTime;
388    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
389    return RB_OK;
390 }
391
392 /*
393  * Write a block to the spool file
394  *
395  *  Returns: true on success or EOT
396  *           false on hard error
397  */
398 bool write_block_to_spool_file(DCR *dcr)
399 {
400    uint32_t wlen, hlen;               /* length to write */
401    bool despool = false;
402    DEV_BLOCK *block = dcr->block;
403
404    if (job_canceled(dcr->jcr)) {
405       return false;
406    }
407    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
408    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
409       return true;
410    }
411
412    hlen = sizeof(spool_hdr);
413    wlen = block->binbuf;
414    P(dcr->dev->spool_mutex);
415    dcr->job_spool_size += hlen + wlen;
416    dcr->dev->spool_size += hlen + wlen;
417    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
418        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
419       despool = true;
420    }
421    V(dcr->dev->spool_mutex);
422    P(mutex);
423    spool_stats.data_size += hlen + wlen;
424    if (spool_stats.data_size > spool_stats.max_data_size) {
425       spool_stats.max_data_size = spool_stats.data_size;
426    }
427    V(mutex);
428    if (despool) {
429       char ec1[30], ec2[30];
430       if (dcr->max_job_spool_size > 0) {
431          Jmsg(dcr->jcr, M_INFO, 0, _("User specified Job spool size reached: "
432             "JobSpoolSize=%s MaxJobSpoolSize=%s\n"),
433             edit_uint64_with_commas(dcr->job_spool_size, ec1),
434             edit_uint64_with_commas(dcr->max_job_spool_size, ec2));
435       } else {
436          Jmsg(dcr->jcr, M_INFO, 0, _("User specified Device spool size reached: "
437             "DevSpoolSize=%s MaxDevSpoolSize=%s\n"),
438             edit_uint64_with_commas(dcr->dev->spool_size, ec1),
439             edit_uint64_with_commas(dcr->dev->max_spool_size, ec2));
440       }
441
442       if (!despool_data(dcr, false)) {
443          Pmsg0(000, _("Bad return from despool in write_block.\n"));
444          return false;
445       }
446       /* Despooling cleared these variables so reset them */
447       P(dcr->dev->spool_mutex);
448       dcr->job_spool_size += hlen + wlen;
449       dcr->dev->spool_size += hlen + wlen;
450       V(dcr->dev->spool_mutex);
451       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
452    }
453
454
455    if (!write_spool_header(dcr)) {
456       return false;
457    }
458    if (!write_spool_data(dcr)) {
459      return false;
460    }
461
462    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
463    empty_block(block);
464    return true;
465 }
466
467 static bool write_spool_header(DCR *dcr)
468 {
469    spool_hdr hdr;
470    ssize_t stat;
471    DEV_BLOCK *block = dcr->block;
472    JCR *jcr = dcr->jcr;
473
474    hdr.FirstIndex = block->FirstIndex;
475    hdr.LastIndex = block->LastIndex;
476    hdr.len = block->binbuf;
477
478    /* Write header */
479    for (int retry=0; retry<=1; retry++) {
480       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
481       if (stat == -1) {
482          berrno be;
483          Jmsg(jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
484               be.bstrerror());
485          jcr->forceJobStatus(JS_FatalError);
486       }
487       if (stat != (ssize_t)sizeof(hdr)) {
488          Jmsg(jcr, M_ERROR, 0, _("Error writing header to spool file."
489               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
490               (int)stat, (int)sizeof(hdr));
491          /* If we wrote something, truncate it, then despool */
492          if (stat != -1) {
493 #if defined(HAVE_WIN32)
494             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
495 #else
496             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
497 #endif
498             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
499                berrno be;
500                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
501                   be.bstrerror());
502               /* Note, try continuing despite ftruncate problem */
503             }
504          }
505          if (!despool_data(dcr, false)) {
506             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
507             jcr->forceJobStatus(JS_FatalError);
508             return false;
509          }
510          continue;                    /* try again */
511       }
512       return true;
513    }
514    Jmsg(jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
515    jcr->forceJobStatus(JS_FatalError);
516    return false;
517 }
518
519 static bool write_spool_data(DCR *dcr)
520 {
521    ssize_t stat;
522    DEV_BLOCK *block = dcr->block;
523    JCR *jcr = dcr->jcr;
524
525    /* Write data */
526    for (int retry=0; retry<=1; retry++) {
527       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
528       if (stat == -1) {
529          berrno be;
530          Jmsg(jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
531               be.bstrerror());
532          jcr->forceJobStatus(JS_FatalError);
533       }
534       if (stat != (ssize_t)block->binbuf) {
535          /*
536           * If we wrote something, truncate it and the header, then despool
537           */
538          if (stat != -1) {
539 #if defined(HAVE_WIN32)
540             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
541 #else
542             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
543 #endif
544             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
545                berrno be;
546                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
547                   be.bstrerror());
548                /* Note, try continuing despite ftruncate problem */
549             }
550          }
551          if (!despool_data(dcr, false)) {
552             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
553             jcr->forceJobStatus(JS_FatalError);
554             return false;
555          }
556          if (!write_spool_header(dcr)) {
557             return false;
558          }
559          continue;                    /* try again */
560       }
561       return true;
562    }
563    Jmsg(jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
564    jcr->forceJobStatus(JS_FatalError);
565    return false;
566 }
567
568 static bool close_data_spool_file(DCR *dcr)
569 {
570    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
571
572    P(mutex);
573    spool_stats.data_jobs--;
574    spool_stats.total_data_jobs++;
575    if (spool_stats.data_size < dcr->job_spool_size) {
576       spool_stats.data_size = 0;
577    } else {
578       spool_stats.data_size -= dcr->job_spool_size;
579    }
580    V(mutex);
581    P(dcr->dev->spool_mutex);
582    dcr->job_spool_size = 0;
583    V(dcr->dev->spool_mutex);
584
585    make_unique_data_spool_filename(dcr, &name);
586    close(dcr->spool_fd);
587    dcr->spool_fd = -1;
588    dcr->spooling = false;
589    unlink(name);
590    Dmsg1(100, "Deleted spool file: %s\n", name);
591    free_pool_memory(name);
592    return true;
593 }
594
595 bool are_attributes_spooled(JCR *jcr)
596 {
597    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
598 }
599
600 /*
601  * Create spool file for attributes.
602  *  This is done by "attaching" to the bsock, and when
603  *  it is called, the output is written to a file.
604  *  The actual spooling is turned on and off in
605  *  append.c only during writing of the attributes.
606  */
607 bool begin_attribute_spool(JCR *jcr)
608 {
609    if (!jcr->no_attributes && jcr->spool_attributes) {
610       return open_attr_spool_file(jcr, jcr->dir_bsock);
611    }
612    return true;
613 }
614
615 static void update_attr_spool_size(ssize_t size)
616 {
617    P(mutex);
618    if (size > 0) {
619      if ((spool_stats.attr_size - size) > 0) {
620         spool_stats.attr_size -= size;
621      } else {
622         spool_stats.attr_size = 0;
623      }
624    }
625    V(mutex);
626 }
627
628 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
629 {
630    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
631       jcr->Job, fd);
632 }
633
634 /*
635  * Tell Director where to find the attributes spool file
636  *  Note, if we are not on the same machine, the Director will
637  *  return an error, and the higher level routine will transmit
638  *  the data record by record -- using bsock->despool().
639  */
640 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
641 {
642    /* send full spool file name */
643    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
644    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
645    bash_spaces(name);
646    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
647    free_pool_memory(name);
648
649    if (jcr->dir_bsock->recv() <= 0) {
650       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
651       jcr->forceJobStatus(JS_FatalError);
652       return false;
653    }
654
655    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
656       return false;
657    }
658    return true;
659 }
660
661 bool commit_attribute_spool(JCR *jcr)
662 {
663    boffset_t size;
664    char ec1[30];
665    char tbuf[100];
666    BSOCK *dir;
667
668    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
669          (utime_t)time(NULL)));
670    if (are_attributes_spooled(jcr)) {
671       dir = jcr->dir_bsock;
672       if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
673          berrno be;
674          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
675               be.bstrerror());
676          jcr->forceJobStatus(JS_FatalError);
677          goto bail_out;
678       }
679       size = ftello(dir->m_spool_fd);
680       if (size < 0) {
681          berrno be;
682          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
683               be.bstrerror());
684          jcr->forceJobStatus(JS_FatalError);
685          goto bail_out;
686       }
687       P(mutex);
688       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
689          spool_stats.max_attr_size = spool_stats.attr_size + size;
690       }
691       spool_stats.attr_size += size;
692       V(mutex);
693       jcr->sendJobStatus(JS_AttrDespooling);
694       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
695             edit_uint64_with_commas(size, ec1));
696
697       if (!blast_attr_spool_file(jcr, size)) {
698          /* Can't read spool file from director side,
699           * send content over network.
700           */
701          dir->despool(update_attr_spool_size, size);
702       }
703       return close_attr_spool_file(jcr, dir);
704    }
705    return true;
706
707 bail_out:
708    close_attr_spool_file(jcr, dir);
709    return false;
710 }
711
712 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
713 {
714    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
715
716    make_unique_spool_filename(jcr, &name, bs->m_fd);
717    bs->m_spool_fd = fopen(name, "w+b");
718    if (!bs->m_spool_fd) {
719       berrno be;
720       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
721            be.bstrerror());
722       jcr->forceJobStatus(JS_FatalError);
723       free_pool_memory(name);
724       return false;
725    }
726    P(mutex);
727    spool_stats.attr_jobs++;
728    V(mutex);
729    free_pool_memory(name);
730    return true;
731 }
732
733 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
734 {
735    POOLMEM *name;
736
737    char tbuf[100];
738
739    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
740          (utime_t)time(NULL)));
741    if (!bs->m_spool_fd) {
742       return true;
743    }
744    name = get_pool_memory(PM_MESSAGE);
745    P(mutex);
746    spool_stats.attr_jobs--;
747    spool_stats.total_attr_jobs++;
748    V(mutex);
749    make_unique_spool_filename(jcr, &name, bs->m_fd);
750    fclose(bs->m_spool_fd);
751    unlink(name);
752    free_pool_memory(name);
753    bs->m_spool_fd = NULL;
754    bs->clear_spooling();
755    return true;
756 }
757
758 bool discard_attribute_spool(JCR *jcr)
759 {
760    if (are_attributes_spooled(jcr)) {
761       return close_attr_spool_file(jcr, jcr->dir_bsock);
762    }
763    return true;
764 }