]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
update version
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *  Spooling code
30  *
31  *      Kern Sibbald, March 2004
32  *
33  */
34
35 #include "bacula.h"
36 #include "stored.h"
37
38 /* Forward referenced subroutines */
39 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
40 static bool open_data_spool_file(DCR *dcr);
41 static bool close_data_spool_file(DCR *dcr);
42 static bool despool_data(DCR *dcr, bool commit);
43 static int  read_block_from_spool_file(DCR *dcr);
44 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
45 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool write_spool_header(DCR *dcr);
47 static bool write_spool_data(DCR *dcr);
48
49 struct spool_stats_t {
50    uint32_t data_jobs;                /* current jobs spooling data */
51    uint32_t attr_jobs;
52    uint32_t total_data_jobs;          /* total jobs to have spooled data */
53    uint32_t total_attr_jobs;
54    int64_t max_data_size;             /* max data size */
55    int64_t max_attr_size;
56    int64_t data_size;                 /* current data size (all jobs running) */
57    int64_t attr_size;
58 };
59
60 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61 spool_stats_t spool_stats;
62
63 /*
64  * Header for data spool record */
65 struct spool_hdr {
66    int32_t  FirstIndex;               /* FirstIndex for buffer */
67    int32_t  LastIndex;                /* LastIndex for buffer */
68    uint32_t len;                      /* length of next buffer */
69 };
70
71 enum {
72    RB_EOT = 1,
73    RB_ERROR,
74    RB_OK
75 };
76
77 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
78 {
79    char ed1[30], ed2[30];
80    POOL_MEM msg(PM_MESSAGE);
81    int len;
82
83    len = Mmsg(msg, _("Spooling statistics:\n"));
84
85    if (spool_stats.data_jobs || spool_stats.max_data_size) {
86       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
87          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
88          spool_stats.total_data_jobs,
89          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
90
91       sendit(msg.c_str(), len, arg);
92    }
93    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
94       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
95          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
96          spool_stats.total_attr_jobs,
97          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
98    
99       sendit(msg.c_str(), len, arg);
100    }
101 }
102
103 bool begin_data_spool(DCR *dcr)
104 {
105    bool stat = true;
106    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
107       Dmsg0(100, "Turning on data spooling\n");
108       dcr->spool_data = true;
109       stat = open_data_spool_file(dcr);
110       if (stat) {
111          dcr->spooling = true;
112          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
113          P(mutex);
114          spool_stats.data_jobs++;
115          V(mutex);
116       }
117    }
118    return stat;
119 }
120
121 bool discard_data_spool(DCR *dcr)
122 {
123    if (dcr->spooling) {
124       Dmsg0(100, "Data spooling discarded\n");
125       return close_data_spool_file(dcr);
126    }
127    return true;
128 }
129
130 bool commit_data_spool(DCR *dcr)
131 {
132    bool stat;
133
134    if (dcr->spooling) {
135       Dmsg0(100, "Committing spooled data\n");
136       stat = despool_data(dcr, true /*commit*/);
137       if (!stat) {
138          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
139          close_data_spool_file(dcr);
140          return false;
141       }
142       return close_data_spool_file(dcr);
143    }
144    return true;
145 }
146
147 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
148 {
149    const char *dir;
150    if (dcr->dev->device->spool_directory) {
151       dir = dcr->dev->device->spool_directory;
152    } else {
153       dir = working_directory;
154    }
155    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
156         dcr->jcr->Job, dcr->device->hdr.name);
157 }
158
159
160 static bool open_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163    int spool_fd;
164
165    make_unique_data_spool_filename(dcr, &name);
166    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
167       dcr->spool_fd = spool_fd;
168       dcr->jcr->spool_attributes = true;
169    } else {
170       berrno be;
171       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
172            be.bstrerror());
173       free_pool_memory(name);
174       return false;
175    }
176    Dmsg1(100, "Created spool file: %s\n", name);
177    free_pool_memory(name);
178    return true;
179 }
180
181 static bool close_data_spool_file(DCR *dcr)
182 {
183    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
184
185    P(mutex);
186    spool_stats.data_jobs--;
187    spool_stats.total_data_jobs++;
188    if (spool_stats.data_size < dcr->job_spool_size) {
189       spool_stats.data_size = 0;
190    } else {
191       spool_stats.data_size -= dcr->job_spool_size;
192    }
193    dcr->job_spool_size = 0;
194    V(mutex);
195
196    make_unique_data_spool_filename(dcr, &name);
197    close(dcr->spool_fd);
198    dcr->spool_fd = -1;
199    dcr->spooling = false;
200    unlink(name);
201    Dmsg1(100, "Deleted spool file: %s\n", name);
202    free_pool_memory(name);
203    return true;
204 }
205
206 static const char *spool_name = "*spool*";
207
208 /*
209  * NB! This routine locks the device, but if committing will
210  *     not unlock it. If not committing, it will be unlocked.
211  */
212 static bool despool_data(DCR *dcr, bool commit)
213 {
214    DEVICE *rdev;
215    DCR *rdcr;
216    bool ok = true;
217    DEV_BLOCK *block;
218    JCR *jcr = dcr->jcr;
219    int stat;
220    char ec1[50];
221
222    Dmsg0(100, "Despooling data\n");
223    if (jcr->dcr->job_spool_size == 0) {
224       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
225    }
226
227    /*
228     * Commit means that the job is done, so we commit, otherwise, we
229     *  are despooling because of user spool size max or some error  
230     *  (e.g. filesystem full).
231     */
232    if (commit) {
233       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
234          jcr->dcr->VolumeName,
235          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
236       set_jcr_job_status(jcr, JS_DataCommitting);
237    } else {
238       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
239          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
240       set_jcr_job_status(jcr, JS_DataDespooling);
241    }
242    set_jcr_job_status(jcr, JS_DataDespooling);
243    dir_send_job_status(jcr);
244    dcr->despool_wait = true;
245    dcr->spooling = false;
246    /*
247     * We work with device blocked, but not locked so that
248     *  other threads -- e.g. reservations can lock the device
249     *  structure.
250     */
251    dcr->dblock(BST_DESPOOLING);
252    dcr->despool_wait = false;
253    dcr->despooling = true;
254
255    /*
256     * This is really quite kludgy and should be fixed some time.
257     * We create a dev structure to read from the spool file
258     * in rdev and rdcr.
259     */
260    rdev = (DEVICE *)malloc(sizeof(DEVICE));
261    memset(rdev, 0, sizeof(DEVICE));
262    rdev->dev_name = get_memory(strlen(spool_name)+1);
263    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
264    rdev->errmsg = get_pool_memory(PM_EMSG);
265    *rdev->errmsg = 0;
266    rdev->max_block_size = dcr->dev->max_block_size;
267    rdev->min_block_size = dcr->dev->min_block_size;
268    rdev->device = dcr->dev->device;
269    rdcr = new_dcr(jcr, NULL, rdev);
270    rdcr->spool_fd = dcr->spool_fd;
271    block = dcr->block;                /* save block */
272    dcr->block = rdcr->block;          /* make read and write block the same */
273
274    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
275    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
276
277 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
278    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
279 #endif
280
281    /* Add run time, to get current wait time */
282    int32_t despool_start = time(NULL) - jcr->run_time;
283
284    set_new_file_parameters(dcr);
285
286    for ( ; ok; ) {
287       if (job_canceled(jcr)) {
288          ok = false;
289          break;
290       }
291       stat = read_block_from_spool_file(rdcr);
292       if (stat == RB_EOT) {
293          break;
294       } else if (stat == RB_ERROR) {
295          ok = false;
296          break;
297       }
298       ok = write_block_to_device(dcr);
299       if (!ok) {
300          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
301                dcr->dev->print_name(), dcr->dev->bstrerror());
302          Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
303                dcr->dev->print_name(), dcr->dev->bstrerror());
304       }
305       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
306    }
307
308    if (!dir_create_jobmedia_record(dcr)) {
309       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
310          dcr->getVolCatName(), jcr->Job);
311    }
312    /* Set new file/block parameters for current dcr */
313    set_new_file_parameters(dcr);
314
315    /*
316     * Subtracting run_time give us elapsed time - wait_time since 
317     * we started despooling. Note, don't use time_t as it is 32 or 64
318     * bits depending on the OS and doesn't edit with %d
319     */
320    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
321
322    if (despool_elapsed <= 0) {
323       despool_elapsed = 1;
324    }
325
326    Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
327          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
328          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
329
330    dcr->block = block;                /* reset block */
331
332    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
333    if (ftruncate(rdcr->spool_fd, 0) != 0) {
334       berrno be;
335       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
336          be.bstrerror());
337       /* Note, try continuing despite ftruncate problem */
338    }
339
340    P(mutex);
341    if (spool_stats.data_size < dcr->job_spool_size) {
342       spool_stats.data_size = 0;
343    } else {
344       spool_stats.data_size -= dcr->job_spool_size;
345    }
346    V(mutex);
347    P(dcr->dev->spool_mutex);
348    dcr->dev->spool_size -= dcr->job_spool_size;
349    dcr->job_spool_size = 0;            /* zap size in input dcr */
350    V(dcr->dev->spool_mutex);
351    free_memory(rdev->dev_name);
352    free_pool_memory(rdev->errmsg);
353    /* Be careful to NULL the jcr and free rdev after free_dcr() */
354    rdcr->jcr = NULL;
355    rdcr->dev = NULL;
356    free_dcr(rdcr);
357    free(rdev);
358    dcr->spooling = true;           /* turn on spooling again */
359    dcr->despooling = false;
360    /*
361     * Note, if committing we leave the device blocked. It will be removed in
362     *  release_device();
363     */
364    if (!commit) {
365       dcr->dev->dunblock();
366    }
367    set_jcr_job_status(jcr, JS_Running);
368    dir_send_job_status(jcr);
369    return ok;
370 }
371
372 /*
373  * Read a block from the spool file
374  *
375  *  Returns RB_OK on success
376  *          RB_EOT when file done
377  *          RB_ERROR on error
378  */
379 static int read_block_from_spool_file(DCR *dcr)
380 {
381    uint32_t rlen;
382    ssize_t stat;
383    spool_hdr hdr;
384    DEV_BLOCK *block = dcr->block;
385
386    rlen = sizeof(hdr);
387    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
388    if (stat == 0) {
389       Dmsg0(100, "EOT on spool read.\n");
390       return RB_EOT;
391    } else if (stat != (ssize_t)rlen) {
392       if (stat == -1) {
393          berrno be;
394          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
395               be.bstrerror());
396       } else {
397          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
398          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
399       }
400       return RB_ERROR;
401    }
402    rlen = hdr.len;
403    if (rlen > block->buf_len) {
404       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
405       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
406       return RB_ERROR;
407    }
408    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
409    if (stat != (ssize_t)rlen) {
410       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
411       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
412       return RB_ERROR;
413    }
414    /* Setup write pointers */
415    block->binbuf = rlen;
416    block->bufp = block->buf + block->binbuf;
417    block->FirstIndex = hdr.FirstIndex;
418    block->LastIndex = hdr.LastIndex;
419    block->VolSessionId = dcr->jcr->VolSessionId;
420    block->VolSessionTime = dcr->jcr->VolSessionTime;
421    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
422    return RB_OK;
423 }
424
425 /*
426  * Write a block to the spool file
427  *
428  *  Returns: true on success or EOT
429  *           false on hard error
430  */
431 bool write_block_to_spool_file(DCR *dcr)
432 {
433    uint32_t wlen, hlen;               /* length to write */
434    bool despool = false;
435    DEV_BLOCK *block = dcr->block;
436
437    if (job_canceled(dcr->jcr)) {
438       return false;
439    }
440    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
441    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
442       return true;
443    }
444
445    hlen = sizeof(spool_hdr);
446    wlen = block->binbuf;
447    P(dcr->dev->spool_mutex);
448    dcr->job_spool_size += hlen + wlen;
449    dcr->dev->spool_size += hlen + wlen;
450    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
451        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
452       despool = true;
453    }
454    V(dcr->dev->spool_mutex);
455    P(mutex);
456    spool_stats.data_size += hlen + wlen;
457    if (spool_stats.data_size > spool_stats.max_data_size) {
458       spool_stats.max_data_size = spool_stats.data_size;
459    }
460    V(mutex);
461    if (despool) {
462 #ifdef xDEBUG
463       char ec1[30], ec2[30], ec3[30], ec4[30];
464       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
465             "max_job_size=%s job_size=%s\n",
466             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
467             edit_uint64_with_commas(dcr->job_spool_size, ec2),
468             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
469             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
470 #endif
471       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
472       if (!despool_data(dcr, false)) {
473          Pmsg0(000, _("Bad return from despool in write_block.\n"));
474          return false;
475       }
476       /* Despooling cleared these variables so reset them */
477       P(dcr->dev->spool_mutex);
478       dcr->job_spool_size += hlen + wlen;
479       dcr->dev->spool_size += hlen + wlen;
480       V(dcr->dev->spool_mutex);
481       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
482    }
483
484
485    if (!write_spool_header(dcr)) {
486       return false;
487    }
488    if (!write_spool_data(dcr)) {
489      return false;
490    }
491
492    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
493    empty_block(block);
494    return true;
495 }
496
497 static bool write_spool_header(DCR *dcr)
498 {
499    spool_hdr hdr;
500    ssize_t stat;
501    DEV_BLOCK *block = dcr->block;
502
503    hdr.FirstIndex = block->FirstIndex;
504    hdr.LastIndex = block->LastIndex;
505    hdr.len = block->binbuf;
506
507    /* Write header */
508    for (int retry=0; retry<=1; retry++) {
509       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
510       if (stat == -1) {
511          berrno be;
512          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
513               be.bstrerror());
514       }
515       if (stat != (ssize_t)sizeof(hdr)) {
516          Jmsg(dcr->jcr, M_ERROR, 0, _("Error writing header to spool file."
517               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
518               (int)stat, (int)sizeof(hdr));
519          /* If we wrote something, truncate it, then despool */
520          if (stat != -1) {
521 #if defined(HAVE_WIN32)
522             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
523 #else
524             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
525 #endif
526             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
527                berrno be;
528                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
529                   be.bstrerror());
530               /* Note, try continuing despite ftruncate problem */
531             }
532          }
533          if (!despool_data(dcr, false)) {
534             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
535             return false;
536          }
537          continue;                    /* try again */
538       }
539       return true;
540    }
541    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
542    return false;
543 }
544
545 static bool write_spool_data(DCR *dcr)
546 {
547    ssize_t stat;
548    DEV_BLOCK *block = dcr->block;
549
550    /* Write data */
551    for (int retry=0; retry<=1; retry++) {
552       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
553       if (stat == -1) {
554          berrno be;
555          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
556               be.bstrerror());
557       }
558       if (stat != (ssize_t)block->binbuf) {
559          /*
560           * If we wrote something, truncate it and the header, then despool
561           */
562          if (stat != -1) {
563 #if defined(HAVE_WIN32)
564             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
565 #else
566             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
567 #endif
568             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
569                berrno be;
570                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
571                   be.bstrerror());
572                /* Note, try continuing despite ftruncate problem */
573             }
574          }
575          if (!despool_data(dcr, false)) {
576             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
577             return false;
578          }
579          if (!write_spool_header(dcr)) {
580             return false;
581          }
582          continue;                    /* try again */
583       }
584       return true;
585    }
586    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
587    return false;
588 }
589
590
591
592 bool are_attributes_spooled(JCR *jcr)
593 {
594    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
595 }
596
597 /*
598  * Create spool file for attributes.
599  *  This is done by "attaching" to the bsock, and when
600  *  it is called, the output is written to a file.
601  *  The actual spooling is turned on and off in
602  *  append.c only during writing of the attributes.
603  */
604 bool begin_attribute_spool(JCR *jcr)
605 {
606    if (!jcr->no_attributes && jcr->spool_attributes) {
607       return open_attr_spool_file(jcr, jcr->dir_bsock);
608    }
609    return true;
610 }
611
612 bool discard_attribute_spool(JCR *jcr)
613 {
614    if (are_attributes_spooled(jcr)) {
615       return close_attr_spool_file(jcr, jcr->dir_bsock);
616    }
617    return true;
618 }
619
620 static void update_attr_spool_size(ssize_t size)
621 {
622    P(mutex);
623    if (size > 0) {
624      if ((spool_stats.attr_size - size) > 0) {
625         spool_stats.attr_size -= size;
626      } else {
627         spool_stats.attr_size = 0;
628      }
629    }
630    V(mutex);
631 }
632
633 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
634 {
635    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
636       jcr->Job, fd);
637 }
638
639 /*
640  * Tell Director where to find the attributes spool file 
641  *  Note, if we are not on the same machine, the Director will
642  *  return an error, and the higher level routine will transmit
643  *  the data record by record -- using bsock->despool().
644  */
645 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
646 {
647    /* send full spool file name */
648    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
649    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
650    bash_spaces(name);
651    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
652    free_pool_memory(name);
653    
654    if (jcr->dir_bsock->recv() <= 0) {
655       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
656       return false;
657    }
658    
659    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
660       return false;
661    }
662    return true;
663 }
664
665 bool commit_attribute_spool(JCR *jcr)
666 {
667    boffset_t size;
668    char ec1[30];
669    char tbuf[100];
670
671    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
672          (utime_t)time(NULL)));
673    if (are_attributes_spooled(jcr)) {
674       if (fseeko(jcr->dir_bsock->m_spool_fd, 0, SEEK_END) != 0) {
675          berrno be;
676          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
677               be.bstrerror());
678          goto bail_out;
679       }
680       size = ftello(jcr->dir_bsock->m_spool_fd);
681       if (size < 0) {
682          berrno be;
683          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
684               be.bstrerror());
685          goto bail_out;
686       }
687       P(mutex);
688       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
689          spool_stats.max_attr_size = spool_stats.attr_size + size;
690       }
691       spool_stats.attr_size += size;
692       V(mutex);
693       set_jcr_job_status(jcr, JS_AttrDespooling);
694       dir_send_job_status(jcr);
695       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
696             edit_uint64_with_commas(size, ec1));
697
698       if (!blast_attr_spool_file(jcr, size)) {
699          /* Can't read spool file from director side,
700           * send content over network.
701           */
702          jcr->dir_bsock->despool(update_attr_spool_size, size);
703       }
704       return close_attr_spool_file(jcr, jcr->dir_bsock);
705    }
706    return true;
707
708 bail_out:
709    close_attr_spool_file(jcr, jcr->dir_bsock);
710    return false;
711 }
712
713 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
714 {
715    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
716
717    make_unique_spool_filename(jcr, &name, bs->m_fd);
718    bs->m_spool_fd = fopen(name, "w+b");
719    if (!bs->m_spool_fd) {
720       berrno be;
721       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
722            be.bstrerror());
723       free_pool_memory(name);
724       return false;
725    }
726    P(mutex);
727    spool_stats.attr_jobs++;
728    V(mutex);
729    free_pool_memory(name);
730    return true;
731 }
732
733 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
734 {
735    POOLMEM *name;
736
737    char tbuf[100];
738
739    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
740          (utime_t)time(NULL)));
741    if (!bs->m_spool_fd) {
742       return true;
743    }
744    name = get_pool_memory(PM_MESSAGE);
745    P(mutex);
746    spool_stats.attr_jobs--;
747    spool_stats.total_attr_jobs++;
748    V(mutex);
749    make_unique_spool_filename(jcr, &name, bs->m_fd);
750    fclose(bs->m_spool_fd);
751    unlink(name);
752    free_pool_memory(name);
753    bs->m_spool_fd = NULL;
754    bs->clear_spooling();
755    return true;
756 }