]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
Add debug code for FreeBSD regress failures
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *  Spooling code
30  *
31  *      Kern Sibbald, March 2004
32  *
33  *  Version $Id$
34  */
35
36 #include "bacula.h"
37 #include "stored.h"
38
39 /* Forward referenced subroutines */
40 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
41 static bool open_data_spool_file(DCR *dcr);
42 static bool close_data_spool_file(DCR *dcr);
43 static bool despool_data(DCR *dcr, bool commit);
44 static int  read_block_from_spool_file(DCR *dcr);
45 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
47 static bool write_spool_header(DCR *dcr);
48 static bool write_spool_data(DCR *dcr);
49
50 struct spool_stats_t {
51    uint32_t data_jobs;                /* current jobs spooling data */
52    uint32_t attr_jobs;
53    uint32_t total_data_jobs;          /* total jobs to have spooled data */
54    uint32_t total_attr_jobs;
55    int64_t max_data_size;             /* max data size */
56    int64_t max_attr_size;
57    int64_t data_size;                 /* current data size (all jobs running) */
58    int64_t attr_size;
59 };
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 spool_stats_t spool_stats;
63
64 /*
65  * Header for data spool record */
66 struct spool_hdr {
67    int32_t  FirstIndex;               /* FirstIndex for buffer */
68    int32_t  LastIndex;                /* LastIndex for buffer */
69    uint32_t len;                      /* length of next buffer */
70 };
71
72 enum {
73    RB_EOT = 1,
74    RB_ERROR,
75    RB_OK
76 };
77
78 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
79 {
80    char ed1[30], ed2[30];
81    POOL_MEM msg(PM_MESSAGE);
82    int len;
83
84    len = Mmsg(msg, _("Spooling statistics:\n"));
85
86    if (spool_stats.data_jobs || spool_stats.max_data_size) {
87       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
88          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
89          spool_stats.total_data_jobs,
90          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
91
92       sendit(msg.c_str(), len, arg);
93    }
94    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
95       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
96          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
97          spool_stats.total_attr_jobs,
98          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
99    
100       sendit(msg.c_str(), len, arg);
101    }
102 }
103
104 bool begin_data_spool(DCR *dcr)
105 {
106    bool stat = true;
107    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
108       Dmsg0(100, "Turning on data spooling\n");
109       dcr->spool_data = true;
110       stat = open_data_spool_file(dcr);
111       if (stat) {
112          dcr->spooling = true;
113          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
114          P(mutex);
115          spool_stats.data_jobs++;
116          V(mutex);
117       }
118    }
119    return stat;
120 }
121
122 bool discard_data_spool(DCR *dcr)
123 {
124    if (dcr->spooling) {
125       Dmsg0(100, "Data spooling discarded\n");
126       return close_data_spool_file(dcr);
127    }
128    return true;
129 }
130
131 bool commit_data_spool(DCR *dcr)
132 {
133    bool stat;
134
135    if (dcr->spooling) {
136       Dmsg0(100, "Committing spooled data\n");
137       stat = despool_data(dcr, true /*commit*/);
138       if (!stat) {
139          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
140          close_data_spool_file(dcr);
141          return false;
142       }
143       return close_data_spool_file(dcr);
144    }
145    return true;
146 }
147
148 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
149 {
150    const char *dir;
151    if (dcr->dev->device->spool_directory) {
152       dir = dcr->dev->device->spool_directory;
153    } else {
154       dir = working_directory;
155    }
156    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
157         dcr->jcr->Job, dcr->device->hdr.name);
158 }
159
160
161 static bool open_data_spool_file(DCR *dcr)
162 {
163    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
164    int spool_fd;
165
166    make_unique_data_spool_filename(dcr, &name);
167    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
168       dcr->spool_fd = spool_fd;
169       dcr->jcr->spool_attributes = true;
170    } else {
171       berrno be;
172       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
173            be.bstrerror());
174       free_pool_memory(name);
175       return false;
176    }
177    Dmsg1(100, "Created spool file: %s\n", name);
178    free_pool_memory(name);
179    return true;
180 }
181
182 static bool close_data_spool_file(DCR *dcr)
183 {
184    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
185
186    P(mutex);
187    spool_stats.data_jobs--;
188    spool_stats.total_data_jobs++;
189    if (spool_stats.data_size < dcr->job_spool_size) {
190       spool_stats.data_size = 0;
191    } else {
192       spool_stats.data_size -= dcr->job_spool_size;
193    }
194    dcr->job_spool_size = 0;
195    V(mutex);
196
197    make_unique_data_spool_filename(dcr, &name);
198    close(dcr->spool_fd);
199    dcr->spool_fd = -1;
200    dcr->spooling = false;
201    unlink(name);
202    Dmsg1(100, "Deleted spool file: %s\n", name);
203    free_pool_memory(name);
204    return true;
205 }
206
207 static const char *spool_name = "*spool*";
208
209 /*
210  * NB! This routine locks the device, but if committing will
211  *     not unlock it. If not committing, it will be unlocked.
212  */
213 static bool despool_data(DCR *dcr, bool commit)
214 {
215    DEVICE *rdev;
216    DCR *rdcr;
217    bool ok = true;
218    DEV_BLOCK *block;
219    JCR *jcr = dcr->jcr;
220    int stat;
221    char ec1[50];
222
223    Dmsg0(100, "Despooling data\n");
224    if (jcr->dcr->job_spool_size == 0) {
225       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
226    }
227
228    /*
229     * Commit means that the job is done, so we commit, otherwise, we
230     *  are despooling because of user spool size max or some error  
231     *  (e.g. filesystem full).
232     */
233    if (commit) {
234       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
235          jcr->dcr->VolumeName,
236          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237       set_jcr_job_status(jcr, JS_DataCommitting);
238    } else {
239       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
240          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
241       set_jcr_job_status(jcr, JS_DataDespooling);
242    }
243    set_jcr_job_status(jcr, JS_DataDespooling);
244    dir_send_job_status(jcr);
245    dcr->despool_wait = true;
246    dcr->spooling = false;
247    /*
248     * We work with device blocked, but not locked so that
249     *  other threads -- e.g. reservations can lock the device
250     *  structure.
251     */
252    dcr->dblock(BST_DESPOOLING);
253    dcr->despool_wait = false;
254    dcr->despooling = true;
255
256    /*
257     * This is really quite kludgy and should be fixed some time.
258     * We create a dev structure to read from the spool file
259     * in rdev and rdcr.
260     */
261    rdev = (DEVICE *)malloc(sizeof(DEVICE));
262    memset(rdev, 0, sizeof(DEVICE));
263    rdev->dev_name = get_memory(strlen(spool_name)+1);
264    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
265    rdev->errmsg = get_pool_memory(PM_EMSG);
266    *rdev->errmsg = 0;
267    rdev->max_block_size = dcr->dev->max_block_size;
268    rdev->min_block_size = dcr->dev->min_block_size;
269    rdev->device = dcr->dev->device;
270    rdcr = new_dcr(jcr, NULL, rdev);
271    rdcr->spool_fd = dcr->spool_fd;
272    block = dcr->block;                /* save block */
273    dcr->block = rdcr->block;          /* make read and write block the same */
274
275    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
276    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
277
278 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
279    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
280 #endif
281
282    /* Add run time, to get current wait time */
283    int32_t despool_start = time(NULL) - jcr->run_time;
284
285    set_new_file_parameters(dcr);
286
287    for ( ; ok; ) {
288       if (job_canceled(jcr)) {
289          ok = false;
290          break;
291       }
292       stat = read_block_from_spool_file(rdcr);
293       if (stat == RB_EOT) {
294          break;
295       } else if (stat == RB_ERROR) {
296          ok = false;
297          break;
298       }
299       ok = write_block_to_device(dcr);
300       if (!ok) {
301          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
302                dcr->dev->print_name(), dcr->dev->bstrerror());
303          Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
304                dcr->dev->print_name(), dcr->dev->bstrerror());
305       }
306       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
307    }
308
309    if (!dir_create_jobmedia_record(dcr)) {
310       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
311          dcr->VolCatInfo.VolCatName, jcr->Job);
312    }
313    /* Set new file/block parameters for current dcr */
314    set_new_file_parameters(dcr);
315
316    /*
317     * Subtracting run_time give us elapsed time - wait_time since 
318     * we started despooling. Note, don't use time_t as it is 32 or 64
319     * bits depending on the OS and doesn't edit with %d
320     */
321    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
322
323    if (despool_elapsed <= 0) {
324       despool_elapsed = 1;
325    }
326
327    Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
328          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
329          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
330
331    dcr->block = block;                /* reset block */
332
333    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
334    if (ftruncate(rdcr->spool_fd, 0) != 0) {
335       berrno be;
336       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
337          be.bstrerror());
338       /* Note, try continuing despite ftruncate problem */
339    }
340
341    P(mutex);
342    if (spool_stats.data_size < dcr->job_spool_size) {
343       spool_stats.data_size = 0;
344    } else {
345       spool_stats.data_size -= dcr->job_spool_size;
346    }
347    V(mutex);
348    P(dcr->dev->spool_mutex);
349    dcr->dev->spool_size -= dcr->job_spool_size;
350    dcr->job_spool_size = 0;            /* zap size in input dcr */
351    V(dcr->dev->spool_mutex);
352    free_memory(rdev->dev_name);
353    free_pool_memory(rdev->errmsg);
354    /* Be careful to NULL the jcr and free rdev after free_dcr() */
355    rdcr->jcr = NULL;
356    rdcr->dev = NULL;
357    free_dcr(rdcr);
358    free(rdev);
359    dcr->spooling = true;           /* turn on spooling again */
360    dcr->despooling = false;
361    /*
362     * Note, if committing we leave the device blocked. It will be removed in
363     *  release_device();
364     */
365    if (!commit) {
366       dcr->dev->dunblock();
367    }
368    set_jcr_job_status(jcr, JS_Running);
369    dir_send_job_status(jcr);
370    return ok;
371 }
372
373 /*
374  * Read a block from the spool file
375  *
376  *  Returns RB_OK on success
377  *          RB_EOT when file done
378  *          RB_ERROR on error
379  */
380 static int read_block_from_spool_file(DCR *dcr)
381 {
382    uint32_t rlen;
383    ssize_t stat;
384    spool_hdr hdr;
385    DEV_BLOCK *block = dcr->block;
386
387    rlen = sizeof(hdr);
388    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
389    if (stat == 0) {
390       Dmsg0(100, "EOT on spool read.\n");
391       return RB_EOT;
392    } else if (stat != (ssize_t)rlen) {
393       if (stat == -1) {
394          berrno be;
395          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
396               be.bstrerror());
397       } else {
398          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
399          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
400       }
401       return RB_ERROR;
402    }
403    rlen = hdr.len;
404    if (rlen > block->buf_len) {
405       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
406       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
407       return RB_ERROR;
408    }
409    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
410    if (stat != (ssize_t)rlen) {
411       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
412       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
413       return RB_ERROR;
414    }
415    /* Setup write pointers */
416    block->binbuf = rlen;
417    block->bufp = block->buf + block->binbuf;
418    block->FirstIndex = hdr.FirstIndex;
419    block->LastIndex = hdr.LastIndex;
420    block->VolSessionId = dcr->jcr->VolSessionId;
421    block->VolSessionTime = dcr->jcr->VolSessionTime;
422    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
423    return RB_OK;
424 }
425
426 /*
427  * Write a block to the spool file
428  *
429  *  Returns: true on success or EOT
430  *           false on hard error
431  */
432 bool write_block_to_spool_file(DCR *dcr)
433 {
434    uint32_t wlen, hlen;               /* length to write */
435    bool despool = false;
436    DEV_BLOCK *block = dcr->block;
437
438    if (job_canceled(dcr->jcr)) {
439       return false;
440    }
441    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
442    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
443       return true;
444    }
445
446    hlen = sizeof(spool_hdr);
447    wlen = block->binbuf;
448    P(dcr->dev->spool_mutex);
449    dcr->job_spool_size += hlen + wlen;
450    dcr->dev->spool_size += hlen + wlen;
451    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
452        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
453       despool = true;
454    }
455    V(dcr->dev->spool_mutex);
456    P(mutex);
457    spool_stats.data_size += hlen + wlen;
458    if (spool_stats.data_size > spool_stats.max_data_size) {
459       spool_stats.max_data_size = spool_stats.data_size;
460    }
461    V(mutex);
462    if (despool) {
463 #ifdef xDEBUG
464       char ec1[30], ec2[30], ec3[30], ec4[30];
465       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
466             "max_job_size=%s job_size=%s\n",
467             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
468             edit_uint64_with_commas(dcr->job_spool_size, ec2),
469             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
470             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
471 #endif
472       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
473       if (!despool_data(dcr, false)) {
474          Pmsg0(000, _("Bad return from despool in write_block.\n"));
475          return false;
476       }
477       /* Despooling cleared these variables so reset them */
478       P(dcr->dev->spool_mutex);
479       dcr->job_spool_size += hlen + wlen;
480       dcr->dev->spool_size += hlen + wlen;
481       V(dcr->dev->spool_mutex);
482       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
483    }
484
485
486    if (!write_spool_header(dcr)) {
487       return false;
488    }
489    if (!write_spool_data(dcr)) {
490      return false;
491    }
492
493    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
494    empty_block(block);
495    return true;
496 }
497
498 static bool write_spool_header(DCR *dcr)
499 {
500    spool_hdr hdr;
501    ssize_t stat;
502    DEV_BLOCK *block = dcr->block;
503
504    hdr.FirstIndex = block->FirstIndex;
505    hdr.LastIndex = block->LastIndex;
506    hdr.len = block->binbuf;
507
508    /* Write header */
509    for (int retry=0; retry<=1; retry++) {
510       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
511       if (stat == -1) {
512          berrno be;
513          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
514               be.bstrerror());
515       }
516       if (stat != (ssize_t)sizeof(hdr)) {
517          Jmsg(dcr->jcr, M_ERROR, 0, _("Error writing header to spool file."
518               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
519               (int)stat, (int)sizeof(hdr));
520          /* If we wrote something, truncate it, then despool */
521          if (stat != -1) {
522 #if defined(HAVE_WIN32)
523             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
524 #else
525             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
526 #endif
527             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
528                berrno be;
529                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
530                   be.bstrerror());
531               /* Note, try continuing despite ftruncate problem */
532             }
533          }
534          if (!despool_data(dcr, false)) {
535             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
536             return false;
537          }
538          continue;                    /* try again */
539       }
540       return true;
541    }
542    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
543    return false;
544 }
545
546 static bool write_spool_data(DCR *dcr)
547 {
548    ssize_t stat;
549    DEV_BLOCK *block = dcr->block;
550
551    /* Write data */
552    for (int retry=0; retry<=1; retry++) {
553       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
554       if (stat == -1) {
555          berrno be;
556          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
557               be.bstrerror());
558       }
559       if (stat != (ssize_t)block->binbuf) {
560          /*
561           * If we wrote something, truncate it and the header, then despool
562           */
563          if (stat != -1) {
564 #if defined(HAVE_WIN32)
565             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
566 #else
567             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
568 #endif
569             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
570                berrno be;
571                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
572                   be.bstrerror());
573                /* Note, try continuing despite ftruncate problem */
574             }
575          }
576          if (!despool_data(dcr, false)) {
577             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
578             return false;
579          }
580          if (!write_spool_header(dcr)) {
581             return false;
582          }
583          continue;                    /* try again */
584       }
585       return true;
586    }
587    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
588    return false;
589 }
590
591
592
593 bool are_attributes_spooled(JCR *jcr)
594 {
595    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
596 }
597
598 /*
599  * Create spool file for attributes.
600  *  This is done by "attaching" to the bsock, and when
601  *  it is called, the output is written to a file.
602  *  The actual spooling is turned on and off in
603  *  append.c only during writing of the attributes.
604  */
605 bool begin_attribute_spool(JCR *jcr)
606 {
607    if (!jcr->no_attributes && jcr->spool_attributes) {
608       return open_attr_spool_file(jcr, jcr->dir_bsock);
609    }
610    return true;
611 }
612
613 bool discard_attribute_spool(JCR *jcr)
614 {
615    if (are_attributes_spooled(jcr)) {
616       return close_attr_spool_file(jcr, jcr->dir_bsock);
617    }
618    return true;
619 }
620
621 static void update_attr_spool_size(ssize_t size)
622 {
623    P(mutex);
624    if (size > 0) {
625      if ((spool_stats.attr_size - size) > 0) {
626         spool_stats.attr_size -= size;
627      } else {
628         spool_stats.attr_size = 0;
629      }
630    }
631    V(mutex);
632 }
633
634 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
635 {
636    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
637       jcr->Job, fd);
638 }
639
640 /*
641  * Tell Director where to find the attributes spool file 
642  *  Note, if we are not on the same machine, the Director will
643  *  return an error, and the higher level routine will transmit
644  *  the data record by record -- using bsock->despool().
645  */
646 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
647 {
648    /* send full spool file name */
649    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
650    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
651    bash_spaces(name);
652    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
653    free_pool_memory(name);
654    
655    if (jcr->dir_bsock->recv() <= 0) {
656       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
657       return false;
658    }
659    
660    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
661       return false;
662    }
663    return true;
664 }
665
666 bool commit_attribute_spool(JCR *jcr)
667 {
668    boffset_t size;
669    char ec1[30];
670    char tbuf[100];
671
672    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
673          (utime_t)time(NULL)));
674    if (are_attributes_spooled(jcr)) {
675       if (fseeko(jcr->dir_bsock->m_spool_fd, 0, SEEK_END) != 0) {
676          berrno be;
677          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
678               be.bstrerror());
679          goto bail_out;
680       }
681       size = ftello(jcr->dir_bsock->m_spool_fd);
682       if (size < 0) {
683          berrno be;
684          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
685               be.bstrerror());
686          goto bail_out;
687       }
688       P(mutex);
689       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
690          spool_stats.max_attr_size = spool_stats.attr_size + size;
691       }
692       spool_stats.attr_size += size;
693       V(mutex);
694       set_jcr_job_status(jcr, JS_AttrDespooling);
695       dir_send_job_status(jcr);
696       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
697             edit_uint64_with_commas(size, ec1));
698
699       if (!blast_attr_spool_file(jcr, size)) {
700          /* Can't read spool file from director side,
701           * send content over network.
702           */
703          jcr->dir_bsock->despool(update_attr_spool_size, size);
704       }
705       return close_attr_spool_file(jcr, jcr->dir_bsock);
706    }
707    return true;
708
709 bail_out:
710    close_attr_spool_file(jcr, jcr->dir_bsock);
711    return false;
712 }
713
714 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
715 {
716    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
717
718    make_unique_spool_filename(jcr, &name, bs->m_fd);
719    bs->m_spool_fd = fopen(name, "w+b");
720    if (!bs->m_spool_fd) {
721       berrno be;
722       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
723            be.bstrerror());
724       free_pool_memory(name);
725       return false;
726    }
727    P(mutex);
728    spool_stats.attr_jobs++;
729    V(mutex);
730    free_pool_memory(name);
731    return true;
732 }
733
734 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
735 {
736    POOLMEM *name;
737
738    char tbuf[100];
739
740    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
741          (utime_t)time(NULL)));
742    if (!bs->m_spool_fd) {
743       return true;
744    }
745    name = get_pool_memory(PM_MESSAGE);
746    P(mutex);
747    spool_stats.attr_jobs--;
748    spool_stats.total_attr_jobs++;
749    V(mutex);
750    make_unique_spool_filename(jcr, &name, bs->m_fd);
751    fclose(bs->m_spool_fd);
752    unlink(name);
753    free_pool_memory(name);
754    bs->m_spool_fd = NULL;
755    bs->clear_spooling();
756    return true;
757 }