]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
Merge branch 'master' into basejobv3
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *  Spooling code
30  *
31  *      Kern Sibbald, March 2004
32  *
33  *  Version $Id$
34  */
35
36 #include "bacula.h"
37 #include "stored.h"
38
39 /* Forward referenced subroutines */
40 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
41 static bool open_data_spool_file(DCR *dcr);
42 static bool close_data_spool_file(DCR *dcr);
43 static bool despool_data(DCR *dcr, bool commit);
44 static int  read_block_from_spool_file(DCR *dcr);
45 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
47 static bool write_spool_header(DCR *dcr);
48 static bool write_spool_data(DCR *dcr);
49
50 struct spool_stats_t {
51    uint32_t data_jobs;                /* current jobs spooling data */
52    uint32_t attr_jobs;
53    uint32_t total_data_jobs;          /* total jobs to have spooled data */
54    uint32_t total_attr_jobs;
55    int64_t max_data_size;             /* max data size */
56    int64_t max_attr_size;
57    int64_t data_size;                 /* current data size (all jobs running) */
58    int64_t attr_size;
59 };
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 spool_stats_t spool_stats;
63
64 /*
65  * Header for data spool record */
66 struct spool_hdr {
67    int32_t  FirstIndex;               /* FirstIndex for buffer */
68    int32_t  LastIndex;                /* LastIndex for buffer */
69    uint32_t len;                      /* length of next buffer */
70 };
71
72 enum {
73    RB_EOT = 1,
74    RB_ERROR,
75    RB_OK
76 };
77
78 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
79 {
80    char ed1[30], ed2[30];
81    POOL_MEM msg(PM_MESSAGE);
82    int len;
83
84    len = Mmsg(msg, _("Spooling statistics:\n"));
85
86    if (spool_stats.data_jobs || spool_stats.max_data_size) {
87       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
88          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
89          spool_stats.total_data_jobs,
90          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
91
92       sendit(msg.c_str(), len, arg);
93    }
94    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
95       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
96          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
97          spool_stats.total_attr_jobs,
98          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
99    
100       sendit(msg.c_str(), len, arg);
101    }
102 }
103
104 bool begin_data_spool(DCR *dcr)
105 {
106    bool stat = true;
107    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
108       Dmsg0(100, "Turning on data spooling\n");
109       dcr->spool_data = true;
110       stat = open_data_spool_file(dcr);
111       if (stat) {
112          dcr->spooling = true;
113          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
114          P(mutex);
115          spool_stats.data_jobs++;
116          V(mutex);
117       }
118    }
119    return stat;
120 }
121
122 bool discard_data_spool(DCR *dcr)
123 {
124    if (dcr->spooling) {
125       Dmsg0(100, "Data spooling discarded\n");
126       return close_data_spool_file(dcr);
127    }
128    return true;
129 }
130
131 bool commit_data_spool(DCR *dcr)
132 {
133    bool stat;
134
135    if (dcr->spooling) {
136       Dmsg0(100, "Committing spooled data\n");
137       stat = despool_data(dcr, true /*commit*/);
138       if (!stat) {
139          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
140          close_data_spool_file(dcr);
141          return false;
142       }
143       return close_data_spool_file(dcr);
144    }
145    return true;
146 }
147
148 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
149 {
150    const char *dir;
151    if (dcr->dev->device->spool_directory) {
152       dir = dcr->dev->device->spool_directory;
153    } else {
154       dir = working_directory;
155    }
156    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
157         dcr->jcr->Job, dcr->device->hdr.name);
158 }
159
160
161 static bool open_data_spool_file(DCR *dcr)
162 {
163    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
164    int spool_fd;
165
166    make_unique_data_spool_filename(dcr, &name);
167    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
168       dcr->spool_fd = spool_fd;
169       dcr->jcr->spool_attributes = true;
170    } else {
171       berrno be;
172       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
173            be.bstrerror());
174       free_pool_memory(name);
175       return false;
176    }
177    Dmsg1(100, "Created spool file: %s\n", name);
178    free_pool_memory(name);
179    return true;
180 }
181
182 static bool close_data_spool_file(DCR *dcr)
183 {
184    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
185
186    P(mutex);
187    spool_stats.data_jobs--;
188    spool_stats.total_data_jobs++;
189    if (spool_stats.data_size < dcr->job_spool_size) {
190       spool_stats.data_size = 0;
191    } else {
192       spool_stats.data_size -= dcr->job_spool_size;
193    }
194    dcr->job_spool_size = 0;
195    V(mutex);
196
197    make_unique_data_spool_filename(dcr, &name);
198    close(dcr->spool_fd);
199    dcr->spool_fd = -1;
200    dcr->spooling = false;
201    unlink(name);
202    Dmsg1(100, "Deleted spool file: %s\n", name);
203    free_pool_memory(name);
204    return true;
205 }
206
207 static const char *spool_name = "*spool*";
208
209 /*
210  * NB! This routine locks the device, but if committing will
211  *     not unlock it. If not committing, it will be unlocked.
212  */
213 static bool despool_data(DCR *dcr, bool commit)
214 {
215    DEVICE *rdev;
216    DCR *rdcr;
217    bool ok = true;
218    DEV_BLOCK *block;
219    JCR *jcr = dcr->jcr;
220    int stat;
221    char ec1[50];
222
223    Dmsg0(100, "Despooling data\n");
224    if (jcr->dcr->job_spool_size == 0) {
225       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
226    }
227
228    /*
229     * Commit means that the job is done, so we commit, otherwise, we
230     *  are despooling because of user spool size max or some error  
231     *  (e.g. filesystem full).
232     */
233    if (commit) {
234       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
235          jcr->dcr->VolumeName,
236          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237       set_jcr_job_status(jcr, JS_DataCommitting);
238    } else {
239       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
240          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
241       set_jcr_job_status(jcr, JS_DataDespooling);
242    }
243    set_jcr_job_status(jcr, JS_DataDespooling);
244    dir_send_job_status(jcr);
245    dcr->despool_wait = true;
246    dcr->spooling = false;
247    /*
248     * We work with device blocked, but not locked so that
249     *  other threads -- e.g. reservations can lock the device
250     *  structure.
251     */
252    dcr->dblock(BST_DESPOOLING);
253    dcr->despool_wait = false;
254    dcr->despooling = true;
255
256    /*
257     * This is really quite kludgy and should be fixed some time.
258     * We create a dev structure to read from the spool file
259     * in rdev and rdcr.
260     */
261    rdev = (DEVICE *)malloc(sizeof(DEVICE));
262    memset(rdev, 0, sizeof(DEVICE));
263    rdev->dev_name = get_memory(strlen(spool_name)+1);
264    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
265    rdev->errmsg = get_pool_memory(PM_EMSG);
266    *rdev->errmsg = 0;
267    rdev->max_block_size = dcr->dev->max_block_size;
268    rdev->min_block_size = dcr->dev->min_block_size;
269    rdev->device = dcr->dev->device;
270    rdcr = new_dcr(jcr, NULL, rdev);
271    rdcr->spool_fd = dcr->spool_fd;
272    block = dcr->block;                /* save block */
273    dcr->block = rdcr->block;          /* make read and write block the same */
274
275    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
276    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
277
278 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
279    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
280 #endif
281
282    /* Add run time, to get current wait time */
283    int32_t despool_start = time(NULL) - jcr->run_time;
284
285    set_new_file_parameters(dcr);
286
287    for ( ; ok; ) {
288       if (job_canceled(jcr)) {
289          ok = false;
290          break;
291       }
292       stat = read_block_from_spool_file(rdcr);
293       if (stat == RB_EOT) {
294          break;
295       } else if (stat == RB_ERROR) {
296          ok = false;
297          break;
298       }
299       ok = write_block_to_device(dcr);
300       if (!ok) {
301          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
302                dcr->dev->print_name(), dcr->dev->bstrerror());
303          Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
304                dcr->dev->print_name(), dcr->dev->bstrerror());
305       }
306       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
307    }
308
309    if (!dir_create_jobmedia_record(dcr)) {
310       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
311          dcr->VolCatInfo.VolCatName, jcr->Job);
312    }
313    /* Set new file/block parameters for current dcr */
314    set_new_file_parameters(dcr);
315
316    /*
317     * Subtracting run_time give us elapsed time - wait_time since 
318     * we started despooling. Note, don't use time_t as it is 32 or 64
319     * bits depending on the OS and doesn't edit with %d
320     */
321    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
322
323    if (despool_elapsed <= 0) {
324       despool_elapsed = 1;
325    }
326
327    Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
328          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
329          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
330
331    dcr->block = block;                /* reset block */
332
333    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
334    if (ftruncate(rdcr->spool_fd, 0) != 0) {
335       berrno be;
336       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
337          be.bstrerror());
338       /* Note, try continuing despite ftruncate problem */
339    }
340
341    P(mutex);
342    if (spool_stats.data_size < dcr->job_spool_size) {
343       spool_stats.data_size = 0;
344    } else {
345       spool_stats.data_size -= dcr->job_spool_size;
346    }
347    V(mutex);
348    P(dcr->dev->spool_mutex);
349    dcr->dev->spool_size -= dcr->job_spool_size;
350    dcr->job_spool_size = 0;            /* zap size in input dcr */
351    V(dcr->dev->spool_mutex);
352    free_memory(rdev->dev_name);
353    free_pool_memory(rdev->errmsg);
354    /* Be careful to NULL the jcr and free rdev after free_dcr() */
355    rdcr->jcr = NULL;
356    rdcr->dev = NULL;
357    free_dcr(rdcr);
358    free(rdev);
359    dcr->spooling = true;           /* turn on spooling again */
360    dcr->despooling = false;
361
362    /* 
363     * We are done, so unblock the device, but if we have done a 
364     *  commit, leave it locked so that the job cleanup does not
365     *  need to wait to release the device (no re-acquire of the lock).
366     */
367    dcr->dlock();
368    unblock_device(dcr->dev);
369    /* If doing a commit, leave the device locked -- unlocked in release_device() */
370    if (!commit) {
371       dcr->dunlock();
372    }
373    set_jcr_job_status(jcr, JS_Running);
374    dir_send_job_status(jcr);
375    return ok;
376 }
377
378 /*
379  * Read a block from the spool file
380  *
381  *  Returns RB_OK on success
382  *          RB_EOT when file done
383  *          RB_ERROR on error
384  */
385 static int read_block_from_spool_file(DCR *dcr)
386 {
387    uint32_t rlen;
388    ssize_t stat;
389    spool_hdr hdr;
390    DEV_BLOCK *block = dcr->block;
391
392    rlen = sizeof(hdr);
393    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
394    if (stat == 0) {
395       Dmsg0(100, "EOT on spool read.\n");
396       return RB_EOT;
397    } else if (stat != (ssize_t)rlen) {
398       if (stat == -1) {
399          berrno be;
400          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
401               be.bstrerror());
402       } else {
403          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
404          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
405       }
406       return RB_ERROR;
407    }
408    rlen = hdr.len;
409    if (rlen > block->buf_len) {
410       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
411       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
412       return RB_ERROR;
413    }
414    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
415    if (stat != (ssize_t)rlen) {
416       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
417       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
418       return RB_ERROR;
419    }
420    /* Setup write pointers */
421    block->binbuf = rlen;
422    block->bufp = block->buf + block->binbuf;
423    block->FirstIndex = hdr.FirstIndex;
424    block->LastIndex = hdr.LastIndex;
425    block->VolSessionId = dcr->jcr->VolSessionId;
426    block->VolSessionTime = dcr->jcr->VolSessionTime;
427    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
428    return RB_OK;
429 }
430
431 /*
432  * Write a block to the spool file
433  *
434  *  Returns: true on success or EOT
435  *           false on hard error
436  */
437 bool write_block_to_spool_file(DCR *dcr)
438 {
439    uint32_t wlen, hlen;               /* length to write */
440    bool despool = false;
441    DEV_BLOCK *block = dcr->block;
442
443    if (job_canceled(dcr->jcr)) {
444       return false;
445    }
446    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
447    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
448       return true;
449    }
450
451    hlen = sizeof(spool_hdr);
452    wlen = block->binbuf;
453    P(dcr->dev->spool_mutex);
454    dcr->job_spool_size += hlen + wlen;
455    dcr->dev->spool_size += hlen + wlen;
456    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
457        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
458       despool = true;
459    }
460    V(dcr->dev->spool_mutex);
461    P(mutex);
462    spool_stats.data_size += hlen + wlen;
463    if (spool_stats.data_size > spool_stats.max_data_size) {
464       spool_stats.max_data_size = spool_stats.data_size;
465    }
466    V(mutex);
467    if (despool) {
468 #ifdef xDEBUG
469       char ec1[30], ec2[30], ec3[30], ec4[30];
470       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
471             "max_job_size=%s job_size=%s\n",
472             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
473             edit_uint64_with_commas(dcr->job_spool_size, ec2),
474             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
475             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
476 #endif
477       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
478       if (!despool_data(dcr, false)) {
479          Pmsg0(000, _("Bad return from despool in write_block.\n"));
480          return false;
481       }
482       /* Despooling cleared these variables so reset them */
483       P(dcr->dev->spool_mutex);
484       dcr->job_spool_size += hlen + wlen;
485       dcr->dev->spool_size += hlen + wlen;
486       V(dcr->dev->spool_mutex);
487       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
488    }
489
490
491    if (!write_spool_header(dcr)) {
492       return false;
493    }
494    if (!write_spool_data(dcr)) {
495      return false;
496    }
497
498    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
499    empty_block(block);
500    return true;
501 }
502
503 static bool write_spool_header(DCR *dcr)
504 {
505    spool_hdr hdr;
506    ssize_t stat;
507    DEV_BLOCK *block = dcr->block;
508
509    hdr.FirstIndex = block->FirstIndex;
510    hdr.LastIndex = block->LastIndex;
511    hdr.len = block->binbuf;
512
513    /* Write header */
514    for (int retry=0; retry<=1; retry++) {
515       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
516       if (stat == -1) {
517          berrno be;
518          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
519               be.bstrerror());
520       }
521       if (stat != (ssize_t)sizeof(hdr)) {
522          Jmsg(dcr->jcr, M_ERROR, 0, _("Error writing header to spool file."
523               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
524               (int)stat, (int)sizeof(hdr));
525          /* If we wrote something, truncate it, then despool */
526          if (stat != -1) {
527 #if defined(HAVE_WIN32)
528             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
529 #else
530             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
531 #endif
532             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
533                berrno be;
534                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
535                   be.bstrerror());
536               /* Note, try continuing despite ftruncate problem */
537             }
538          }
539          if (!despool_data(dcr, false)) {
540             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
541             return false;
542          }
543          continue;                    /* try again */
544       }
545       return true;
546    }
547    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
548    return false;
549 }
550
551 static bool write_spool_data(DCR *dcr)
552 {
553    ssize_t stat;
554    DEV_BLOCK *block = dcr->block;
555
556    /* Write data */
557    for (int retry=0; retry<=1; retry++) {
558       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
559       if (stat == -1) {
560          berrno be;
561          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
562               be.bstrerror());
563       }
564       if (stat != (ssize_t)block->binbuf) {
565          /*
566           * If we wrote something, truncate it and the header, then despool
567           */
568          if (stat != -1) {
569 #if defined(HAVE_WIN32)
570             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
571 #else
572             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
573 #endif
574             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
575                berrno be;
576                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
577                   be.bstrerror());
578                /* Note, try continuing despite ftruncate problem */
579             }
580          }
581          if (!despool_data(dcr, false)) {
582             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
583             return false;
584          }
585          if (!write_spool_header(dcr)) {
586             return false;
587          }
588          continue;                    /* try again */
589       }
590       return true;
591    }
592    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
593    return false;
594 }
595
596
597
598 bool are_attributes_spooled(JCR *jcr)
599 {
600    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
601 }
602
603 /*
604  * Create spool file for attributes.
605  *  This is done by "attaching" to the bsock, and when
606  *  it is called, the output is written to a file.
607  *  The actual spooling is turned on and off in
608  *  append.c only during writing of the attributes.
609  */
610 bool begin_attribute_spool(JCR *jcr)
611 {
612    if (!jcr->no_attributes && jcr->spool_attributes) {
613       return open_attr_spool_file(jcr, jcr->dir_bsock);
614    }
615    return true;
616 }
617
618 bool discard_attribute_spool(JCR *jcr)
619 {
620    if (are_attributes_spooled(jcr)) {
621       return close_attr_spool_file(jcr, jcr->dir_bsock);
622    }
623    return true;
624 }
625
626 static void update_attr_spool_size(ssize_t size)
627 {
628    P(mutex);
629    if (size > 0) {
630      if ((spool_stats.attr_size - size) > 0) {
631         spool_stats.attr_size -= size;
632      } else {
633         spool_stats.attr_size = 0;
634      }
635    }
636    V(mutex);
637 }
638
639 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
640 {
641    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
642       jcr->Job, fd);
643 }
644
645 /*
646  * Tell Director where to find the attributes spool file 
647  *  Note, if we are not on the same machine, the Director will
648  *  return an error, and the higher level routine will transmit
649  *  the data record by record -- using bsock->despool().
650  */
651 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
652 {
653    /* send full spool file name */
654    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
655    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
656    bash_spaces(name);
657    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
658    free_pool_memory(name);
659    
660    if (jcr->dir_bsock->recv() <= 0) {
661       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
662       return false;
663    }
664    
665    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
666       return false;
667    }
668    return true;
669 }
670
671 bool commit_attribute_spool(JCR *jcr)
672 {
673    boffset_t size;
674    char ec1[30];
675    char tbuf[100];
676
677    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
678          (utime_t)time(NULL)));
679    if (are_attributes_spooled(jcr)) {
680       if (fseeko(jcr->dir_bsock->m_spool_fd, 0, SEEK_END) != 0) {
681          berrno be;
682          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
683               be.bstrerror());
684          goto bail_out;
685       }
686       size = ftello(jcr->dir_bsock->m_spool_fd);
687       if (size < 0) {
688          berrno be;
689          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
690               be.bstrerror());
691          goto bail_out;
692       }
693       P(mutex);
694       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
695          spool_stats.max_attr_size = spool_stats.attr_size + size;
696       }
697       spool_stats.attr_size += size;
698       V(mutex);
699       set_jcr_job_status(jcr, JS_AttrDespooling);
700       dir_send_job_status(jcr);
701       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
702             edit_uint64_with_commas(size, ec1));
703
704       if (!blast_attr_spool_file(jcr, size)) {
705          /* Can't read spool file from director side,
706           * send content over network.
707           */
708          jcr->dir_bsock->despool(update_attr_spool_size, size);
709       }
710       return close_attr_spool_file(jcr, jcr->dir_bsock);
711    }
712    return true;
713
714 bail_out:
715    close_attr_spool_file(jcr, jcr->dir_bsock);
716    return false;
717 }
718
719 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
720 {
721    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
722
723    make_unique_spool_filename(jcr, &name, bs->m_fd);
724    bs->m_spool_fd = fopen(name, "w+b");
725    if (!bs->m_spool_fd) {
726       berrno be;
727       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
728            be.bstrerror());
729       free_pool_memory(name);
730       return false;
731    }
732    P(mutex);
733    spool_stats.attr_jobs++;
734    V(mutex);
735    free_pool_memory(name);
736    return true;
737 }
738
739 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
740 {
741    POOLMEM *name;
742
743    char tbuf[100];
744
745    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
746          (utime_t)time(NULL)));
747    if (!bs->m_spool_fd) {
748       return true;
749    }
750    name = get_pool_memory(PM_MESSAGE);
751    P(mutex);
752    spool_stats.attr_jobs--;
753    spool_stats.total_attr_jobs++;
754    V(mutex);
755    make_unique_spool_filename(jcr, &name, bs->m_fd);
756    fclose(bs->m_spool_fd);
757    unlink(name);
758    free_pool_memory(name);
759    bs->m_spool_fd = NULL;
760    bs->clear_spooling();
761    return true;
762 }