]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
Add additional mysql connection debug code
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *  Spooling code
30  *
31  *      Kern Sibbald, March 2004
32  *
33  *  Version $Id$
34  */
35
36 #include "bacula.h"
37 #include "stored.h"
38
39 /* Forward referenced subroutines */
40 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
41 static bool open_data_spool_file(DCR *dcr);
42 static bool close_data_spool_file(DCR *dcr);
43 static bool despool_data(DCR *dcr, bool commit);
44 static int  read_block_from_spool_file(DCR *dcr);
45 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
47 static bool write_spool_header(DCR *dcr);
48 static bool write_spool_data(DCR *dcr);
49
50 struct spool_stats_t {
51    uint32_t data_jobs;                /* current jobs spooling data */
52    uint32_t attr_jobs;
53    uint32_t total_data_jobs;          /* total jobs to have spooled data */
54    uint32_t total_attr_jobs;
55    int64_t max_data_size;             /* max data size */
56    int64_t max_attr_size;
57    int64_t data_size;                 /* current data size (all jobs running) */
58    int64_t attr_size;
59 };
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 spool_stats_t spool_stats;
63
64 /*
65  * Header for data spool record */
66 struct spool_hdr {
67    int32_t  FirstIndex;               /* FirstIndex for buffer */
68    int32_t  LastIndex;                /* LastIndex for buffer */
69    uint32_t len;                      /* length of next buffer */
70 };
71
72 enum {
73    RB_EOT = 1,
74    RB_ERROR,
75    RB_OK
76 };
77
78 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
79 {
80    char ed1[30], ed2[30];
81    POOL_MEM msg(PM_MESSAGE);
82    int len;
83
84    len = Mmsg(msg, _("Spooling statistics:\n"));
85
86    if (spool_stats.data_jobs || spool_stats.max_data_size) {
87       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
88          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
89          spool_stats.total_data_jobs,
90          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
91
92       sendit(msg.c_str(), len, arg);
93    }
94    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
95       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
96          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
97          spool_stats.total_attr_jobs,
98          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
99    
100       sendit(msg.c_str(), len, arg);
101    }
102 }
103
104 bool begin_data_spool(DCR *dcr)
105 {
106    bool stat = true;
107    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
108       Dmsg0(100, "Turning on data spooling\n");
109       dcr->spool_data = true;
110       stat = open_data_spool_file(dcr);
111       if (stat) {
112          dcr->spooling = true;
113          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
114          P(mutex);
115          spool_stats.data_jobs++;
116          V(mutex);
117       }
118    }
119    return stat;
120 }
121
122 bool discard_data_spool(DCR *dcr)
123 {
124    if (dcr->spooling) {
125       Dmsg0(100, "Data spooling discarded\n");
126       return close_data_spool_file(dcr);
127    }
128    return true;
129 }
130
131 bool commit_data_spool(DCR *dcr)
132 {
133    bool stat;
134
135    if (dcr->spooling) {
136       Dmsg0(100, "Committing spooled data\n");
137       stat = despool_data(dcr, true /*commit*/);
138       if (!stat) {
139          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
140          close_data_spool_file(dcr);
141          return false;
142       }
143       return close_data_spool_file(dcr);
144    }
145    return true;
146 }
147
148 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
149 {
150    const char *dir;
151    if (dcr->dev->device->spool_directory) {
152       dir = dcr->dev->device->spool_directory;
153    } else {
154       dir = working_directory;
155    }
156    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
157         dcr->jcr->Job, dcr->device->hdr.name);
158 }
159
160
161 static bool open_data_spool_file(DCR *dcr)
162 {
163    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
164    int spool_fd;
165
166    make_unique_data_spool_filename(dcr, &name);
167    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
168       dcr->spool_fd = spool_fd;
169       dcr->jcr->spool_attributes = true;
170    } else {
171       berrno be;
172       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
173            be.bstrerror());
174       free_pool_memory(name);
175       return false;
176    }
177    Dmsg1(100, "Created spool file: %s\n", name);
178    free_pool_memory(name);
179    return true;
180 }
181
182 static bool close_data_spool_file(DCR *dcr)
183 {
184    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
185
186    P(mutex);
187    spool_stats.data_jobs--;
188    spool_stats.total_data_jobs++;
189    if (spool_stats.data_size < dcr->job_spool_size) {
190       spool_stats.data_size = 0;
191    } else {
192       spool_stats.data_size -= dcr->job_spool_size;
193    }
194    dcr->job_spool_size = 0;
195    V(mutex);
196
197    make_unique_data_spool_filename(dcr, &name);
198    close(dcr->spool_fd);
199    dcr->spool_fd = -1;
200    dcr->spooling = false;
201    unlink(name);
202    Dmsg1(100, "Deleted spool file: %s\n", name);
203    free_pool_memory(name);
204    return true;
205 }
206
207 static const char *spool_name = "*spool*";
208
209 /*
210  * NB! This routine locks the device, but if committing will
211  *     not unlock it. If not committing, it will be unlocked.
212  */
213 static bool despool_data(DCR *dcr, bool commit)
214 {
215    DEVICE *rdev;
216    DCR *rdcr;
217    bool ok = true;
218    DEV_BLOCK *block;
219    JCR *jcr = dcr->jcr;
220    int stat;
221    char ec1[50];
222
223    Dmsg0(100, "Despooling data\n");
224    /*
225     * Commit means that the job is done, so we commit, otherwise, we
226     *  are despooling because of user spool size max or some error  
227     *  (e.g. filesystem full).
228     */
229    if (commit) {
230       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
231          jcr->dcr->VolumeName,
232          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
233       set_jcr_job_status(jcr, JS_DataCommitting);
234    } else {
235       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
236          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237       set_jcr_job_status(jcr, JS_DataDespooling);
238    }
239    set_jcr_job_status(jcr, JS_DataDespooling);
240    dir_send_job_status(jcr);
241    dcr->despool_wait = true;
242    dcr->spooling = false;
243    /*
244     * We work with device blocked, but not locked so that
245     *  other threads -- e.g. reservations can lock the device
246     *  structure.
247     */
248    dcr->dblock(BST_DESPOOLING);
249    dcr->despool_wait = false;
250    dcr->despooling = true;
251
252    /*
253     * This is really quite kludgy and should be fixed some time.
254     * We create a dev structure to read from the spool file
255     * in rdev and rdcr.
256     */
257    rdev = (DEVICE *)malloc(sizeof(DEVICE));
258    memset(rdev, 0, sizeof(DEVICE));
259    rdev->dev_name = get_memory(strlen(spool_name)+1);
260    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
261    rdev->errmsg = get_pool_memory(PM_EMSG);
262    *rdev->errmsg = 0;
263    rdev->max_block_size = dcr->dev->max_block_size;
264    rdev->min_block_size = dcr->dev->min_block_size;
265    rdev->device = dcr->dev->device;
266    rdcr = new_dcr(jcr, NULL, rdev);
267    rdcr->spool_fd = dcr->spool_fd;
268    block = dcr->block;                /* save block */
269    dcr->block = rdcr->block;          /* make read and write block the same */
270
271    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
272    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
273
274 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
275    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
276 #endif
277
278    /* Add run time, to get current wait time */
279    int32_t despool_start = time(NULL) - jcr->run_time;
280
281    set_new_file_parameters(dcr);
282
283    for ( ; ok; ) {
284       if (job_canceled(jcr)) {
285          ok = false;
286          break;
287       }
288       stat = read_block_from_spool_file(rdcr);
289       if (stat == RB_EOT) {
290          break;
291       } else if (stat == RB_ERROR) {
292          ok = false;
293          break;
294       }
295       ok = write_block_to_device(dcr);
296       if (!ok) {
297          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
298                dcr->dev->print_name(), dcr->dev->bstrerror());
299          Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
300                dcr->dev->print_name(), dcr->dev->bstrerror());
301       }
302       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
303    }
304
305    if (!dir_create_jobmedia_record(dcr)) {
306       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
307          dcr->VolCatInfo.VolCatName, jcr->Job);
308    }
309    /* Set new file/block parameters for current dcr */
310    set_new_file_parameters(dcr);
311
312    /*
313     * Subtracting run_time give us elapsed time - wait_time since 
314     * we started despooling. Note, don't use time_t as it is 32 or 64
315     * bits depending on the OS and doesn't edit with %d
316     */
317    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
318
319    if (despool_elapsed <= 0) {
320       despool_elapsed = 1;
321    }
322
323    Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
324          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
325          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
326
327    dcr->block = block;                /* reset block */
328
329    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
330    if (ftruncate(rdcr->spool_fd, 0) != 0) {
331       berrno be;
332       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
333          be.bstrerror());
334       /* Note, try continuing despite ftruncate problem */
335    }
336
337    P(mutex);
338    if (spool_stats.data_size < dcr->job_spool_size) {
339       spool_stats.data_size = 0;
340    } else {
341       spool_stats.data_size -= dcr->job_spool_size;
342    }
343    V(mutex);
344    P(dcr->dev->spool_mutex);
345    dcr->dev->spool_size -= dcr->job_spool_size;
346    dcr->job_spool_size = 0;            /* zap size in input dcr */
347    V(dcr->dev->spool_mutex);
348    free_memory(rdev->dev_name);
349    free_pool_memory(rdev->errmsg);
350    /* Be careful to NULL the jcr and free rdev after free_dcr() */
351    rdcr->jcr = NULL;
352    rdcr->dev = NULL;
353    free_dcr(rdcr);
354    free(rdev);
355    dcr->spooling = true;           /* turn on spooling again */
356    dcr->despooling = false;
357
358    /* 
359     * We are done, so unblock the device, but if we have done a 
360     *  commit, leave it locked so that the job cleanup does not
361     *  need to wait to release the device (no re-acquire of the lock).
362     */
363    dcr->dlock();
364    unblock_device(dcr->dev);
365    /* If doing a commit, leave the device locked -- unlocked in release_device() */
366    if (!commit) {
367       dcr->dunlock();
368    }
369    set_jcr_job_status(jcr, JS_Running);
370    dir_send_job_status(jcr);
371    return ok;
372 }
373
374 /*
375  * Read a block from the spool file
376  *
377  *  Returns RB_OK on success
378  *          RB_EOT when file done
379  *          RB_ERROR on error
380  */
381 static int read_block_from_spool_file(DCR *dcr)
382 {
383    uint32_t rlen;
384    ssize_t stat;
385    spool_hdr hdr;
386    DEV_BLOCK *block = dcr->block;
387
388    rlen = sizeof(hdr);
389    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
390    if (stat == 0) {
391       Dmsg0(100, "EOT on spool read.\n");
392       return RB_EOT;
393    } else if (stat != (ssize_t)rlen) {
394       if (stat == -1) {
395          berrno be;
396          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
397               be.bstrerror());
398       } else {
399          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
400          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
401       }
402       return RB_ERROR;
403    }
404    rlen = hdr.len;
405    if (rlen > block->buf_len) {
406       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
407       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
408       return RB_ERROR;
409    }
410    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
411    if (stat != (ssize_t)rlen) {
412       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
413       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
414       return RB_ERROR;
415    }
416    /* Setup write pointers */
417    block->binbuf = rlen;
418    block->bufp = block->buf + block->binbuf;
419    block->FirstIndex = hdr.FirstIndex;
420    block->LastIndex = hdr.LastIndex;
421    block->VolSessionId = dcr->jcr->VolSessionId;
422    block->VolSessionTime = dcr->jcr->VolSessionTime;
423    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
424    return RB_OK;
425 }
426
427 /*
428  * Write a block to the spool file
429  *
430  *  Returns: true on success or EOT
431  *           false on hard error
432  */
433 bool write_block_to_spool_file(DCR *dcr)
434 {
435    uint32_t wlen, hlen;               /* length to write */
436    bool despool = false;
437    DEV_BLOCK *block = dcr->block;
438
439    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
440    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
441       return true;
442    }
443
444    hlen = sizeof(spool_hdr);
445    wlen = block->binbuf;
446    P(dcr->dev->spool_mutex);
447    dcr->job_spool_size += hlen + wlen;
448    dcr->dev->spool_size += hlen + wlen;
449    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
450        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
451       despool = true;
452    }
453    V(dcr->dev->spool_mutex);
454    P(mutex);
455    spool_stats.data_size += hlen + wlen;
456    if (spool_stats.data_size > spool_stats.max_data_size) {
457       spool_stats.max_data_size = spool_stats.data_size;
458    }
459    V(mutex);
460    if (despool) {
461 #ifdef xDEBUG
462       char ec1[30], ec2[30], ec3[30], ec4[30];
463       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
464             "max_job_size=%s job_size=%s\n",
465             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
466             edit_uint64_with_commas(dcr->job_spool_size, ec2),
467             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
468             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
469 #endif
470       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
471       if (!despool_data(dcr, false)) {
472          Pmsg0(000, _("Bad return from despool in write_block.\n"));
473          return false;
474       }
475       /* Despooling cleared these variables so reset them */
476       P(dcr->dev->spool_mutex);
477       dcr->job_spool_size += hlen + wlen;
478       dcr->dev->spool_size += hlen + wlen;
479       V(dcr->dev->spool_mutex);
480       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
481    }
482
483
484    if (!write_spool_header(dcr)) {
485       return false;
486    }
487    if (!write_spool_data(dcr)) {
488      return false;
489    }
490
491    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
492    empty_block(block);
493    return true;
494 }
495
496 static bool write_spool_header(DCR *dcr)
497 {
498    spool_hdr hdr;
499    ssize_t stat;
500    DEV_BLOCK *block = dcr->block;
501
502    hdr.FirstIndex = block->FirstIndex;
503    hdr.LastIndex = block->LastIndex;
504    hdr.len = block->binbuf;
505
506    /* Write header */
507    for (int retry=0; retry<=1; retry++) {
508       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
509       if (stat == -1) {
510          berrno be;
511          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
512               be.bstrerror());
513       }
514       if (stat != (ssize_t)sizeof(hdr)) {
515          /* If we wrote something, truncate it, then despool */
516          if (stat != -1) {
517 #if defined(HAVE_WIN32)
518             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
519 #else
520             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
521 #endif
522             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
523                berrno be;
524                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
525                   be.bstrerror());
526               /* Note, try continuing despite ftruncate problem */
527             }
528          }
529          if (!despool_data(dcr, false)) {
530             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
531             return false;
532          }
533          continue;                    /* try again */
534       }
535       return true;
536    }
537    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
538    return false;
539 }
540
541 static bool write_spool_data(DCR *dcr)
542 {
543    ssize_t stat;
544    DEV_BLOCK *block = dcr->block;
545
546    /* Write data */
547    for (int retry=0; retry<=1; retry++) {
548       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
549       if (stat == -1) {
550          berrno be;
551          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
552               be.bstrerror());
553       }
554       if (stat != (ssize_t)block->binbuf) {
555          /*
556           * If we wrote something, truncate it and the header, then despool
557           */
558          if (stat != -1) {
559 #if defined(HAVE_WIN32)
560             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
561 #else
562             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
563 #endif
564             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
565                berrno be;
566                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
567                   be.bstrerror());
568                /* Note, try continuing despite ftruncate problem */
569             }
570          }
571          if (!despool_data(dcr, false)) {
572             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
573             return false;
574          }
575          if (!write_spool_header(dcr)) {
576             return false;
577          }
578          continue;                    /* try again */
579       }
580       return true;
581    }
582    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
583    return false;
584 }
585
586
587
588 bool are_attributes_spooled(JCR *jcr)
589 {
590    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
591 }
592
593 /*
594  * Create spool file for attributes.
595  *  This is done by "attaching" to the bsock, and when
596  *  it is called, the output is written to a file.
597  *  The actual spooling is turned on and off in
598  *  append.c only during writing of the attributes.
599  */
600 bool begin_attribute_spool(JCR *jcr)
601 {
602    if (!jcr->no_attributes && jcr->spool_attributes) {
603       return open_attr_spool_file(jcr, jcr->dir_bsock);
604    }
605    return true;
606 }
607
608 bool discard_attribute_spool(JCR *jcr)
609 {
610    if (are_attributes_spooled(jcr)) {
611       return close_attr_spool_file(jcr, jcr->dir_bsock);
612    }
613    return true;
614 }
615
616 static void update_attr_spool_size(ssize_t size)
617 {
618    P(mutex);
619    if (size > 0) {
620      if ((spool_stats.attr_size - size) > 0) {
621         spool_stats.attr_size -= size;
622      } else {
623         spool_stats.attr_size = 0;
624      }
625    }
626    V(mutex);
627 }
628
629 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
630 {
631    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
632       jcr->Job, fd);
633 }
634
635 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
636 {
637    /* send full spool file name */
638    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
639    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
640    bash_spaces(name);
641    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n",
642                          jcr->Job, name);
643    free_pool_memory(name);
644    
645    if (jcr->dir_bsock->recv() <= 0) {
646       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
647       return false;
648    }
649    
650    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
651       return false;
652    }
653    return true;
654 }
655
656 bool commit_attribute_spool(JCR *jcr)
657 {
658    boffset_t size;
659    char ec1[30];
660    char tbuf[100];
661
662    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
663          (utime_t)time(NULL)));
664    if (are_attributes_spooled(jcr)) {
665       if (fseeko(jcr->dir_bsock->m_spool_fd, 0, SEEK_END) != 0) {
666          berrno be;
667          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
668               be.bstrerror());
669          goto bail_out;
670       }
671       size = ftello(jcr->dir_bsock->m_spool_fd);
672       if (size < 0) {
673          berrno be;
674          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
675               be.bstrerror());
676          goto bail_out;
677       }
678       P(mutex);
679       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
680          spool_stats.max_attr_size = spool_stats.attr_size + size;
681       }
682       spool_stats.attr_size += size;
683       V(mutex);
684       set_jcr_job_status(jcr, JS_AttrDespooling);
685       dir_send_job_status(jcr);
686       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
687             edit_uint64_with_commas(size, ec1));
688
689       if (!blast_attr_spool_file(jcr, size)) {
690          /* Can't read spool file from director side,
691           * send content over network.
692           */
693          jcr->dir_bsock->despool(update_attr_spool_size, size);
694       }
695       return close_attr_spool_file(jcr, jcr->dir_bsock);
696    }
697    return true;
698
699 bail_out:
700    close_attr_spool_file(jcr, jcr->dir_bsock);
701    return false;
702 }
703
704 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
705 {
706    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
707
708    make_unique_spool_filename(jcr, &name, bs->m_fd);
709    bs->m_spool_fd = fopen(name, "w+b");
710    if (!bs->m_spool_fd) {
711       berrno be;
712       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
713            be.bstrerror());
714       free_pool_memory(name);
715       return false;
716    }
717    P(mutex);
718    spool_stats.attr_jobs++;
719    V(mutex);
720    free_pool_memory(name);
721    return true;
722 }
723
724 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
725 {
726    POOLMEM *name;
727
728    char tbuf[100];
729
730    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
731          (utime_t)time(NULL)));
732    if (!bs->m_spool_fd) {
733       return true;
734    }
735    name = get_pool_memory(PM_MESSAGE);
736    P(mutex);
737    spool_stats.attr_jobs--;
738    spool_stats.total_attr_jobs++;
739    V(mutex);
740    make_unique_spool_filename(jcr, &name, bs->m_fd);
741    fclose(bs->m_spool_fd);
742    unlink(name);
743    free_pool_memory(name);
744    bs->m_spool_fd = NULL;
745    bs->clear_spooling();
746    return true;
747 }