]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
Create patch that may fix bug #1298 and bug #1304, which causes
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *  Spooling code
30  *
31  *      Kern Sibbald, March 2004
32  *
33  *  Version $Id$
34  */
35
36 #include "bacula.h"
37 #include "stored.h"
38
39 /* Forward referenced subroutines */
40 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
41 static bool open_data_spool_file(DCR *dcr);
42 static bool close_data_spool_file(DCR *dcr);
43 static bool despool_data(DCR *dcr, bool commit);
44 static int  read_block_from_spool_file(DCR *dcr);
45 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
47 static bool write_spool_header(DCR *dcr);
48 static bool write_spool_data(DCR *dcr);
49
50 struct spool_stats_t {
51    uint32_t data_jobs;                /* current jobs spooling data */
52    uint32_t attr_jobs;
53    uint32_t total_data_jobs;          /* total jobs to have spooled data */
54    uint32_t total_attr_jobs;
55    int64_t max_data_size;             /* max data size */
56    int64_t max_attr_size;
57    int64_t data_size;                 /* current data size (all jobs running) */
58    int64_t attr_size;
59 };
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62 spool_stats_t spool_stats;
63
64 /*
65  * Header for data spool record */
66 struct spool_hdr {
67    int32_t  FirstIndex;               /* FirstIndex for buffer */
68    int32_t  LastIndex;                /* LastIndex for buffer */
69    uint32_t len;                      /* length of next buffer */
70 };
71
72 enum {
73    RB_EOT = 1,
74    RB_ERROR,
75    RB_OK
76 };
77
78 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
79 {
80    char ed1[30], ed2[30];
81    POOL_MEM msg(PM_MESSAGE);
82    int len;
83
84    len = Mmsg(msg, _("Spooling statistics:\n"));
85
86    if (spool_stats.data_jobs || spool_stats.max_data_size) {
87       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
88          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
89          spool_stats.total_data_jobs,
90          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
91
92       sendit(msg.c_str(), len, arg);
93    }
94    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
95       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
96          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
97          spool_stats.total_attr_jobs,
98          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
99    
100       sendit(msg.c_str(), len, arg);
101    }
102 }
103
104 bool begin_data_spool(DCR *dcr)
105 {
106    bool stat = true;
107    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
108       Dmsg0(100, "Turning on data spooling\n");
109       dcr->spool_data = true;
110       stat = open_data_spool_file(dcr);
111       if (stat) {
112          dcr->spooling = true;
113          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
114          P(mutex);
115          spool_stats.data_jobs++;
116          V(mutex);
117       }
118    }
119    return stat;
120 }
121
122 bool discard_data_spool(DCR *dcr)
123 {
124    if (dcr->spooling) {
125       Dmsg0(100, "Data spooling discarded\n");
126       return close_data_spool_file(dcr);
127    }
128    return true;
129 }
130
131 bool commit_data_spool(DCR *dcr)
132 {
133    bool stat;
134
135    if (dcr->spooling) {
136       Dmsg0(100, "Committing spooled data\n");
137       stat = despool_data(dcr, true /*commit*/);
138       if (!stat) {
139          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
140          close_data_spool_file(dcr);
141          return false;
142       }
143       return close_data_spool_file(dcr);
144    }
145    return true;
146 }
147
148 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
149 {
150    const char *dir;
151    if (dcr->dev->device->spool_directory) {
152       dir = dcr->dev->device->spool_directory;
153    } else {
154       dir = working_directory;
155    }
156    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
157         dcr->jcr->Job, dcr->device->hdr.name);
158 }
159
160
161 static bool open_data_spool_file(DCR *dcr)
162 {
163    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
164    int spool_fd;
165
166    make_unique_data_spool_filename(dcr, &name);
167    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
168       dcr->spool_fd = spool_fd;
169       dcr->jcr->spool_attributes = true;
170    } else {
171       berrno be;
172       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
173            be.bstrerror());
174       free_pool_memory(name);
175       return false;
176    }
177    Dmsg1(100, "Created spool file: %s\n", name);
178    free_pool_memory(name);
179    return true;
180 }
181
182 static bool close_data_spool_file(DCR *dcr)
183 {
184    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
185
186    P(mutex);
187    spool_stats.data_jobs--;
188    spool_stats.total_data_jobs++;
189    if (spool_stats.data_size < dcr->job_spool_size) {
190       spool_stats.data_size = 0;
191    } else {
192       spool_stats.data_size -= dcr->job_spool_size;
193    }
194    dcr->job_spool_size = 0;
195    V(mutex);
196
197    make_unique_data_spool_filename(dcr, &name);
198    close(dcr->spool_fd);
199    dcr->spool_fd = -1;
200    dcr->spooling = false;
201    unlink(name);
202    Dmsg1(100, "Deleted spool file: %s\n", name);
203    free_pool_memory(name);
204    return true;
205 }
206
207 static const char *spool_name = "*spool*";
208
209 /*
210  * NB! This routine locks the device, but if committing will
211  *     not unlock it. If not committing, it will be unlocked.
212  */
213 static bool despool_data(DCR *dcr, bool commit)
214 {
215    DEVICE *rdev;
216    DCR *rdcr;
217    bool ok = true;
218    DEV_BLOCK *block;
219    JCR *jcr = dcr->jcr;
220    int stat;
221    char ec1[50];
222
223    Dmsg0(100, "Despooling data\n");
224    /*
225     * Commit means that the job is done, so we commit, otherwise, we
226     *  are despooling because of user spool size max or some error  
227     *  (e.g. filesystem full).
228     */
229    if (commit) {
230       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
231          jcr->dcr->VolumeName,
232          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
233       set_jcr_job_status(jcr, JS_DataCommitting);
234    } else {
235       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
236          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237       set_jcr_job_status(jcr, JS_DataDespooling);
238    }
239    set_jcr_job_status(jcr, JS_DataDespooling);
240    dir_send_job_status(jcr);
241    dcr->despool_wait = true;
242    dcr->spooling = false;
243    /*
244     * We work with device blocked, but not locked so that
245     *  other threads -- e.g. reservations can lock the device
246     *  structure.
247     */
248    dcr->dblock(BST_DESPOOLING);
249    dcr->despool_wait = false;
250    dcr->despooling = true;
251
252    /*
253     * This is really quite kludgy and should be fixed some time.
254     * We create a dev structure to read from the spool file
255     * in rdev and rdcr.
256     */
257    rdev = (DEVICE *)malloc(sizeof(DEVICE));
258    memset(rdev, 0, sizeof(DEVICE));
259    rdev->dev_name = get_memory(strlen(spool_name)+1);
260    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
261    rdev->errmsg = get_pool_memory(PM_EMSG);
262    *rdev->errmsg = 0;
263    rdev->max_block_size = dcr->dev->max_block_size;
264    rdev->min_block_size = dcr->dev->min_block_size;
265    rdev->device = dcr->dev->device;
266    rdcr = new_dcr(jcr, NULL, rdev);
267    rdcr->spool_fd = dcr->spool_fd;
268    block = dcr->block;                /* save block */
269    dcr->block = rdcr->block;          /* make read and write block the same */
270
271    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
272    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
273
274 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
275    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
276 #endif
277
278    /* Add run time, to get current wait time */
279    int32_t despool_start = time(NULL) - jcr->run_time;
280
281    set_new_file_parameters(dcr);
282
283    for ( ; ok; ) {
284       if (job_canceled(jcr)) {
285          ok = false;
286          break;
287       }
288       stat = read_block_from_spool_file(rdcr);
289       if (stat == RB_EOT) {
290          break;
291       } else if (stat == RB_ERROR) {
292          ok = false;
293          break;
294       }
295       ok = write_block_to_device(dcr);
296       if (!ok) {
297          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
298                dcr->dev->print_name(), dcr->dev->bstrerror());
299          Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
300                dcr->dev->print_name(), dcr->dev->bstrerror());
301       }
302       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
303    }
304
305    if (!dir_create_jobmedia_record(dcr)) {
306       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
307          dcr->VolCatInfo.VolCatName, jcr->Job);
308    }
309    /* Set new file/block parameters for current dcr */
310    set_new_file_parameters(dcr);
311
312    /*
313     * Subtracting run_time give us elapsed time - wait_time since 
314     * we started despooling. Note, don't use time_t as it is 32 or 64
315     * bits depending on the OS and doesn't edit with %d
316     */
317    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
318
319    if (despool_elapsed <= 0) {
320       despool_elapsed = 1;
321    }
322
323    Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
324          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
325          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
326
327    dcr->block = block;                /* reset block */
328
329    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
330    if (ftruncate(rdcr->spool_fd, 0) != 0) {
331       berrno be;
332       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
333          be.bstrerror());
334       /* Note, try continuing despite ftruncate problem */
335    }
336
337    P(mutex);
338    if (spool_stats.data_size < dcr->job_spool_size) {
339       spool_stats.data_size = 0;
340    } else {
341       spool_stats.data_size -= dcr->job_spool_size;
342    }
343    V(mutex);
344    P(dcr->dev->spool_mutex);
345    dcr->dev->spool_size -= dcr->job_spool_size;
346    dcr->job_spool_size = 0;            /* zap size in input dcr */
347    V(dcr->dev->spool_mutex);
348    free_memory(rdev->dev_name);
349    free_pool_memory(rdev->errmsg);
350    /* Be careful to NULL the jcr and free rdev after free_dcr() */
351    rdcr->jcr = NULL;
352    rdcr->dev = NULL;
353    free_dcr(rdcr);
354    free(rdev);
355    dcr->spooling = true;           /* turn on spooling again */
356    dcr->despooling = false;
357
358    /* 
359     * We are done, so unblock the device, but if we have done a 
360     *  commit, leave it locked so that the job cleanup does not
361     *  need to wait to release the device (no re-acquire of the lock).
362     */
363    dcr->dlock();
364    unblock_device(dcr->dev);
365    /* If doing a commit, leave the device locked -- unlocked in release_device() */
366    if (!commit) {
367       dcr->dunlock();
368    }
369    set_jcr_job_status(jcr, JS_Running);
370    dir_send_job_status(jcr);
371    return ok;
372 }
373
374 /*
375  * Read a block from the spool file
376  *
377  *  Returns RB_OK on success
378  *          RB_EOT when file done
379  *          RB_ERROR on error
380  */
381 static int read_block_from_spool_file(DCR *dcr)
382 {
383    uint32_t rlen;
384    ssize_t stat;
385    spool_hdr hdr;
386    DEV_BLOCK *block = dcr->block;
387
388    rlen = sizeof(hdr);
389    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
390    if (stat == 0) {
391       Dmsg0(100, "EOT on spool read.\n");
392       return RB_EOT;
393    } else if (stat != (ssize_t)rlen) {
394       if (stat == -1) {
395          berrno be;
396          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
397               be.bstrerror());
398       } else {
399          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
400          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
401       }
402       return RB_ERROR;
403    }
404    rlen = hdr.len;
405    if (rlen > block->buf_len) {
406       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
407       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
408       return RB_ERROR;
409    }
410    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
411    if (stat != (ssize_t)rlen) {
412       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
413       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
414       return RB_ERROR;
415    }
416    /* Setup write pointers */
417    block->binbuf = rlen;
418    block->bufp = block->buf + block->binbuf;
419    block->FirstIndex = hdr.FirstIndex;
420    block->LastIndex = hdr.LastIndex;
421    block->VolSessionId = dcr->jcr->VolSessionId;
422    block->VolSessionTime = dcr->jcr->VolSessionTime;
423    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
424    return RB_OK;
425 }
426
427 /*
428  * Write a block to the spool file
429  *
430  *  Returns: true on success or EOT
431  *           false on hard error
432  */
433 bool write_block_to_spool_file(DCR *dcr)
434 {
435    uint32_t wlen, hlen;               /* length to write */
436    bool despool = false;
437    DEV_BLOCK *block = dcr->block;
438
439    if (job_canceled(dcr->jcr)) {
440       return false;
441    }
442    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
443    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
444       return true;
445    }
446
447    hlen = sizeof(spool_hdr);
448    wlen = block->binbuf;
449    P(dcr->dev->spool_mutex);
450    dcr->job_spool_size += hlen + wlen;
451    dcr->dev->spool_size += hlen + wlen;
452    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
453        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
454       despool = true;
455    }
456    V(dcr->dev->spool_mutex);
457    P(mutex);
458    spool_stats.data_size += hlen + wlen;
459    if (spool_stats.data_size > spool_stats.max_data_size) {
460       spool_stats.max_data_size = spool_stats.data_size;
461    }
462    V(mutex);
463    if (despool) {
464 #ifdef xDEBUG
465       char ec1[30], ec2[30], ec3[30], ec4[30];
466       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
467             "max_job_size=%s job_size=%s\n",
468             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
469             edit_uint64_with_commas(dcr->job_spool_size, ec2),
470             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
471             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
472 #endif
473       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
474       if (!despool_data(dcr, false)) {
475          Pmsg0(000, _("Bad return from despool in write_block.\n"));
476          return false;
477       }
478       /* Despooling cleared these variables so reset them */
479       P(dcr->dev->spool_mutex);
480       dcr->job_spool_size += hlen + wlen;
481       dcr->dev->spool_size += hlen + wlen;
482       V(dcr->dev->spool_mutex);
483       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
484    }
485
486
487    if (!write_spool_header(dcr)) {
488       return false;
489    }
490    if (!write_spool_data(dcr)) {
491      return false;
492    }
493
494    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
495    empty_block(block);
496    return true;
497 }
498
499 static bool write_spool_header(DCR *dcr)
500 {
501    spool_hdr hdr;
502    ssize_t stat;
503    DEV_BLOCK *block = dcr->block;
504
505    hdr.FirstIndex = block->FirstIndex;
506    hdr.LastIndex = block->LastIndex;
507    hdr.len = block->binbuf;
508
509    /* Write header */
510    for (int retry=0; retry<=1; retry++) {
511       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
512       if (stat == -1) {
513          berrno be;
514          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
515               be.bstrerror());
516       }
517       if (stat != (ssize_t)sizeof(hdr)) {
518          /* If we wrote something, truncate it, then despool */
519          if (stat != -1) {
520 #if defined(HAVE_WIN32)
521             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
522 #else
523             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
524 #endif
525             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
526                berrno be;
527                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
528                   be.bstrerror());
529               /* Note, try continuing despite ftruncate problem */
530             }
531          }
532          if (!despool_data(dcr, false)) {
533             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
534             return false;
535          }
536          continue;                    /* try again */
537       }
538       return true;
539    }
540    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
541    return false;
542 }
543
544 static bool write_spool_data(DCR *dcr)
545 {
546    ssize_t stat;
547    DEV_BLOCK *block = dcr->block;
548
549    /* Write data */
550    for (int retry=0; retry<=1; retry++) {
551       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
552       if (stat == -1) {
553          berrno be;
554          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
555               be.bstrerror());
556       }
557       if (stat != (ssize_t)block->binbuf) {
558          /*
559           * If we wrote something, truncate it and the header, then despool
560           */
561          if (stat != -1) {
562 #if defined(HAVE_WIN32)
563             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
564 #else
565             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
566 #endif
567             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
568                berrno be;
569                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
570                   be.bstrerror());
571                /* Note, try continuing despite ftruncate problem */
572             }
573          }
574          if (!despool_data(dcr, false)) {
575             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
576             return false;
577          }
578          if (!write_spool_header(dcr)) {
579             return false;
580          }
581          continue;                    /* try again */
582       }
583       return true;
584    }
585    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
586    return false;
587 }
588
589
590
591 bool are_attributes_spooled(JCR *jcr)
592 {
593    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
594 }
595
596 /*
597  * Create spool file for attributes.
598  *  This is done by "attaching" to the bsock, and when
599  *  it is called, the output is written to a file.
600  *  The actual spooling is turned on and off in
601  *  append.c only during writing of the attributes.
602  */
603 bool begin_attribute_spool(JCR *jcr)
604 {
605    if (!jcr->no_attributes && jcr->spool_attributes) {
606       return open_attr_spool_file(jcr, jcr->dir_bsock);
607    }
608    return true;
609 }
610
611 bool discard_attribute_spool(JCR *jcr)
612 {
613    if (are_attributes_spooled(jcr)) {
614       return close_attr_spool_file(jcr, jcr->dir_bsock);
615    }
616    return true;
617 }
618
619 static void update_attr_spool_size(ssize_t size)
620 {
621    P(mutex);
622    if (size > 0) {
623      if ((spool_stats.attr_size - size) > 0) {
624         spool_stats.attr_size -= size;
625      } else {
626         spool_stats.attr_size = 0;
627      }
628    }
629    V(mutex);
630 }
631
632 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
633 {
634    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
635       jcr->Job, fd);
636 }
637
638 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
639 {
640    /* send full spool file name */
641    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
642    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
643    bash_spaces(name);
644    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n",
645                          jcr->Job, name);
646    free_pool_memory(name);
647    
648    if (jcr->dir_bsock->recv() <= 0) {
649       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
650       return false;
651    }
652    
653    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
654       return false;
655    }
656    return true;
657 }
658
659 bool commit_attribute_spool(JCR *jcr)
660 {
661    boffset_t size;
662    char ec1[30];
663    char tbuf[100];
664
665    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
666          (utime_t)time(NULL)));
667    if (are_attributes_spooled(jcr)) {
668       if (fseeko(jcr->dir_bsock->m_spool_fd, 0, SEEK_END) != 0) {
669          berrno be;
670          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
671               be.bstrerror());
672          goto bail_out;
673       }
674       size = ftello(jcr->dir_bsock->m_spool_fd);
675       if (size < 0) {
676          berrno be;
677          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
678               be.bstrerror());
679          goto bail_out;
680       }
681       P(mutex);
682       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
683          spool_stats.max_attr_size = spool_stats.attr_size + size;
684       }
685       spool_stats.attr_size += size;
686       V(mutex);
687       set_jcr_job_status(jcr, JS_AttrDespooling);
688       dir_send_job_status(jcr);
689       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
690             edit_uint64_with_commas(size, ec1));
691
692       if (!blast_attr_spool_file(jcr, size)) {
693          /* Can't read spool file from director side,
694           * send content over network.
695           */
696          jcr->dir_bsock->despool(update_attr_spool_size, size);
697       }
698       return close_attr_spool_file(jcr, jcr->dir_bsock);
699    }
700    return true;
701
702 bail_out:
703    close_attr_spool_file(jcr, jcr->dir_bsock);
704    return false;
705 }
706
707 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
708 {
709    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
710
711    make_unique_spool_filename(jcr, &name, bs->m_fd);
712    bs->m_spool_fd = fopen(name, "w+b");
713    if (!bs->m_spool_fd) {
714       berrno be;
715       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
716            be.bstrerror());
717       free_pool_memory(name);
718       return false;
719    }
720    P(mutex);
721    spool_stats.attr_jobs++;
722    V(mutex);
723    free_pool_memory(name);
724    return true;
725 }
726
727 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
728 {
729    POOLMEM *name;
730
731    char tbuf[100];
732
733    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
734          (utime_t)time(NULL)));
735    if (!bs->m_spool_fd) {
736       return true;
737    }
738    name = get_pool_memory(PM_MESSAGE);
739    P(mutex);
740    spool_stats.attr_jobs--;
741    spool_stats.total_attr_jobs++;
742    V(mutex);
743    make_unique_spool_filename(jcr, &name, bs->m_fd);
744    fclose(bs->m_spool_fd);
745    unlink(name);
746    free_pool_memory(name);
747    bs->m_spool_fd = NULL;
748    bs->clear_spooling();
749    return true;
750 }