]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
update configure
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *  Spooling code
30  *
31  *      Kern Sibbald, March 2004
32  *
33  */
34
35 #include "bacula.h"
36 #include "stored.h"
37
38 /* Forward referenced subroutines */
39 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
40 static bool open_data_spool_file(DCR *dcr);
41 static bool close_data_spool_file(DCR *dcr);
42 static bool despool_data(DCR *dcr, bool commit);
43 static int  read_block_from_spool_file(DCR *dcr);
44 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
45 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool write_spool_header(DCR *dcr);
47 static bool write_spool_data(DCR *dcr);
48
49 struct spool_stats_t {
50    uint32_t data_jobs;                /* current jobs spooling data */
51    uint32_t attr_jobs;
52    uint32_t total_data_jobs;          /* total jobs to have spooled data */
53    uint32_t total_attr_jobs;
54    int64_t max_data_size;             /* max data size */
55    int64_t max_attr_size;
56    int64_t data_size;                 /* current data size (all jobs running) */
57    int64_t attr_size;
58 };
59
60 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61 spool_stats_t spool_stats;
62
63 /*
64  * Header for data spool record */
65 struct spool_hdr {
66    int32_t  FirstIndex;               /* FirstIndex for buffer */
67    int32_t  LastIndex;                /* LastIndex for buffer */
68    uint32_t len;                      /* length of next buffer */
69 };
70
71 enum {
72    RB_EOT = 1,
73    RB_ERROR,
74    RB_OK
75 };
76
77 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
78 {
79    char ed1[30], ed2[30];
80    POOL_MEM msg(PM_MESSAGE);
81    int len;
82
83    len = Mmsg(msg, _("Spooling statistics:\n"));
84
85    if (spool_stats.data_jobs || spool_stats.max_data_size) {
86       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
87          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
88          spool_stats.total_data_jobs,
89          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
90
91       sendit(msg.c_str(), len, arg);
92    }
93    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
94       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
95          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
96          spool_stats.total_attr_jobs,
97          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
98    
99       sendit(msg.c_str(), len, arg);
100    }
101 }
102
103 bool begin_data_spool(DCR *dcr)
104 {
105    bool stat = true;
106    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
107       Dmsg0(100, "Turning on data spooling\n");
108       dcr->spool_data = true;
109       stat = open_data_spool_file(dcr);
110       if (stat) {
111          dcr->spooling = true;
112          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
113          P(mutex);
114          spool_stats.data_jobs++;
115          V(mutex);
116       }
117    }
118    return stat;
119 }
120
121 bool discard_data_spool(DCR *dcr)
122 {
123    if (dcr->spooling) {
124       Dmsg0(100, "Data spooling discarded\n");
125       return close_data_spool_file(dcr);
126    }
127    return true;
128 }
129
130 bool commit_data_spool(DCR *dcr)
131 {
132    bool stat;
133
134    if (dcr->spooling) {
135       Dmsg0(100, "Committing spooled data\n");
136       stat = despool_data(dcr, true /*commit*/);
137       if (!stat) {
138          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
139          close_data_spool_file(dcr);
140          return false;
141       }
142       return close_data_spool_file(dcr);
143    }
144    return true;
145 }
146
147 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
148 {
149    const char *dir;
150    if (dcr->dev->device->spool_directory) {
151       dir = dcr->dev->device->spool_directory;
152    } else {
153       dir = working_directory;
154    }
155    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
156         dcr->jcr->Job, dcr->device->hdr.name);
157 }
158
159
160 static bool open_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163    int spool_fd;
164
165    make_unique_data_spool_filename(dcr, &name);
166    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
167       dcr->spool_fd = spool_fd;
168       dcr->jcr->spool_attributes = true;
169    } else {
170       berrno be;
171       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
172            be.bstrerror());
173       free_pool_memory(name);
174       return false;
175    }
176    Dmsg1(100, "Created spool file: %s\n", name);
177    free_pool_memory(name);
178    return true;
179 }
180
181 static bool close_data_spool_file(DCR *dcr)
182 {
183    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
184
185    P(mutex);
186    spool_stats.data_jobs--;
187    spool_stats.total_data_jobs++;
188    if (spool_stats.data_size < dcr->job_spool_size) {
189       spool_stats.data_size = 0;
190    } else {
191       spool_stats.data_size -= dcr->job_spool_size;
192    }
193    dcr->job_spool_size = 0;
194    V(mutex);
195
196    make_unique_data_spool_filename(dcr, &name);
197    close(dcr->spool_fd);
198    dcr->spool_fd = -1;
199    dcr->spooling = false;
200    unlink(name);
201    Dmsg1(100, "Deleted spool file: %s\n", name);
202    free_pool_memory(name);
203    return true;
204 }
205
206 static const char *spool_name = "*spool*";
207
208 /*
209  * NB! This routine locks the device, but if committing will
210  *     not unlock it. If not committing, it will be unlocked.
211  */
212 static bool despool_data(DCR *dcr, bool commit)
213 {
214    DEVICE *rdev;
215    DCR *rdcr;
216    bool ok = true;
217    DEV_BLOCK *block;
218    JCR *jcr = dcr->jcr;
219    int stat;
220    char ec1[50];
221    BSOCK *dir = jcr->dir_bsock;
222
223    Dmsg0(100, "Despooling data\n");
224    if (jcr->dcr->job_spool_size == 0) {
225       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
226    }
227
228    /*
229     * Commit means that the job is done, so we commit, otherwise, we
230     *  are despooling because of user spool size max or some error  
231     *  (e.g. filesystem full).
232     */
233    if (commit) {
234       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
235          jcr->dcr->VolumeName,
236          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237       jcr->setJobStatus(JS_DataCommitting);
238    } else {
239       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
240          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
241       jcr->setJobStatus(JS_DataDespooling);
242    }
243    jcr->setJobStatus(JS_DataDespooling);
244    dir_send_job_status(jcr);
245    dcr->despool_wait = true;
246    dcr->spooling = false;
247    /*
248     * We work with device blocked, but not locked so that
249     *  other threads -- e.g. reservations can lock the device
250     *  structure.
251     */
252    dcr->dblock(BST_DESPOOLING);
253    dcr->despool_wait = false;
254    dcr->despooling = true;
255
256    /*
257     * This is really quite kludgy and should be fixed some time.
258     * We create a dev structure to read from the spool file
259     * in rdev and rdcr.
260     */
261    rdev = (DEVICE *)malloc(sizeof(DEVICE));
262    memset(rdev, 0, sizeof(DEVICE));
263    rdev->dev_name = get_memory(strlen(spool_name)+1);
264    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
265    rdev->errmsg = get_pool_memory(PM_EMSG);
266    *rdev->errmsg = 0;
267    rdev->max_block_size = dcr->dev->max_block_size;
268    rdev->min_block_size = dcr->dev->min_block_size;
269    rdev->device = dcr->dev->device;
270    rdcr = new_dcr(jcr, NULL, rdev);
271    rdcr->spool_fd = dcr->spool_fd;
272    block = dcr->block;                /* save block */
273    dcr->block = rdcr->block;          /* make read and write block the same */
274
275    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
276    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
277
278 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
279    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
280 #endif
281
282    /* Add run time, to get current wait time */
283    int32_t despool_start = time(NULL) - jcr->run_time;
284
285    set_new_file_parameters(dcr);
286
287    for ( ; ok; ) {
288       if (job_canceled(jcr)) {
289          ok = false;
290          break;
291       }
292       stat = read_block_from_spool_file(rdcr);
293       if (stat == RB_EOT) {
294          break;
295       } else if (stat == RB_ERROR) {
296          ok = false;
297          break;
298       }
299       ok = write_block_to_device(dcr);
300       if (!ok) {
301          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
302                dcr->dev->print_name(), dcr->dev->bstrerror());
303          Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
304                dcr->dev->print_name(), dcr->dev->bstrerror());
305          /* Force in case Incomplete set */
306          jcr->forceJobStatus(JS_FatalError);
307       }
308       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
309    }
310
311    /*
312     * If this Job is incomplete, we need to backup the FileIndex
313     *  to the last correctly saved file so that the JobMedia
314     *  LastIndex is correct.
315     */
316    if (jcr->is_JobStatus(JS_Incomplete)) {
317       dcr->VolLastIndex = dir->get_FileIndex();
318       Dmsg1(100, "======= Set FI=%ld\n", dir->get_FileIndex());
319    }
320
321    if (!dir_create_jobmedia_record(dcr)) {
322       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
323          dcr->getVolCatName(), jcr->Job);
324       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
325    }
326    /* Set new file/block parameters for current dcr */
327    set_new_file_parameters(dcr);
328
329    /*
330     * Subtracting run_time give us elapsed time - wait_time since 
331     * we started despooling. Note, don't use time_t as it is 32 or 64
332     * bits depending on the OS and doesn't edit with %d
333     */
334    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
335
336    if (despool_elapsed <= 0) {
337       despool_elapsed = 1;
338    }
339
340    Jmsg(jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
341          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
342          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
343
344    dcr->block = block;                /* reset block */
345
346    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
347    if (ftruncate(rdcr->spool_fd, 0) != 0) {
348       berrno be;
349       Jmsg(jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
350          be.bstrerror());
351       /* Note, try continuing despite ftruncate problem */
352    }
353
354    P(mutex);
355    if (spool_stats.data_size < dcr->job_spool_size) {
356       spool_stats.data_size = 0;
357    } else {
358       spool_stats.data_size -= dcr->job_spool_size;
359    }
360    V(mutex);
361    P(dcr->dev->spool_mutex);
362    dcr->dev->spool_size -= dcr->job_spool_size;
363    dcr->job_spool_size = 0;            /* zap size in input dcr */
364    V(dcr->dev->spool_mutex);
365    free_memory(rdev->dev_name);
366    free_pool_memory(rdev->errmsg);
367    /* Be careful to NULL the jcr and free rdev after free_dcr() */
368    rdcr->jcr = NULL;
369    rdcr->dev = NULL;
370    free_dcr(rdcr);
371    free(rdev);
372    dcr->spooling = true;           /* turn on spooling again */
373    dcr->despooling = false;
374    /*
375     * Note, if committing we leave the device blocked. It will be removed in
376     *  release_device();
377     */
378    if (!commit) {
379       dcr->dev->dunblock();
380    }
381    jcr->setJobStatus(JS_Running);
382    dir_send_job_status(jcr);
383    return ok;
384 }
385
386 /*
387  * Read a block from the spool file
388  *
389  *  Returns RB_OK on success
390  *          RB_EOT when file done
391  *          RB_ERROR on error
392  */
393 static int read_block_from_spool_file(DCR *dcr)
394 {
395    uint32_t rlen;
396    ssize_t stat;
397    spool_hdr hdr;
398    DEV_BLOCK *block = dcr->block;
399    JCR *jcr = dcr->jcr;
400
401    rlen = sizeof(hdr);
402    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
403    if (stat == 0) {
404       Dmsg0(100, "EOT on spool read.\n");
405       return RB_EOT;
406    } else if (stat != (ssize_t)rlen) {
407       if (stat == -1) {
408          berrno be;
409          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
410               be.bstrerror());
411       } else {
412          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
413          Jmsg2(jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
414       }
415       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
416       return RB_ERROR;
417    }
418    rlen = hdr.len;
419    if (rlen > block->buf_len) {
420       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
421       Jmsg2(jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
422       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
423       return RB_ERROR;
424    }
425    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
426    if (stat != (ssize_t)rlen) {
427       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
428       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
429       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
430       return RB_ERROR;
431    }
432    /* Setup write pointers */
433    block->binbuf = rlen;
434    block->bufp = block->buf + block->binbuf;
435    block->FirstIndex = hdr.FirstIndex;
436    block->LastIndex = hdr.LastIndex;
437    block->VolSessionId = dcr->jcr->VolSessionId;
438    block->VolSessionTime = dcr->jcr->VolSessionTime;
439    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
440    return RB_OK;
441 }
442
443 /*
444  * Write a block to the spool file
445  *
446  *  Returns: true on success or EOT
447  *           false on hard error
448  */
449 bool write_block_to_spool_file(DCR *dcr)
450 {
451    uint32_t wlen, hlen;               /* length to write */
452    bool despool = false;
453    DEV_BLOCK *block = dcr->block;
454
455    if (job_canceled(dcr->jcr)) {
456       return false;
457    }
458    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
459    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
460       return true;
461    }
462
463    hlen = sizeof(spool_hdr);
464    wlen = block->binbuf;
465    P(dcr->dev->spool_mutex);
466    dcr->job_spool_size += hlen + wlen;
467    dcr->dev->spool_size += hlen + wlen;
468    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
469        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
470       despool = true;
471    }
472    V(dcr->dev->spool_mutex);
473    P(mutex);
474    spool_stats.data_size += hlen + wlen;
475    if (spool_stats.data_size > spool_stats.max_data_size) {
476       spool_stats.max_data_size = spool_stats.data_size;
477    }
478    V(mutex);
479    if (despool) {
480 #ifdef xDEBUG
481       char ec1[30], ec2[30], ec3[30], ec4[30];
482       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
483             "max_job_size=%s job_size=%s\n",
484             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
485             edit_uint64_with_commas(dcr->job_spool_size, ec2),
486             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
487             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
488 #endif
489       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
490       if (!despool_data(dcr, false)) {
491          Pmsg0(000, _("Bad return from despool in write_block.\n"));
492          return false;
493       }
494       /* Despooling cleared these variables so reset them */
495       P(dcr->dev->spool_mutex);
496       dcr->job_spool_size += hlen + wlen;
497       dcr->dev->spool_size += hlen + wlen;
498       V(dcr->dev->spool_mutex);
499       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
500    }
501
502
503    if (!write_spool_header(dcr)) {
504       return false;
505    }
506    if (!write_spool_data(dcr)) {
507      return false;
508    }
509
510    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
511    empty_block(block);
512    return true;
513 }
514
515 static bool write_spool_header(DCR *dcr)
516 {
517    spool_hdr hdr;
518    ssize_t stat;
519    DEV_BLOCK *block = dcr->block;
520    JCR *jcr = dcr->jcr;
521
522    hdr.FirstIndex = block->FirstIndex;
523    hdr.LastIndex = block->LastIndex;
524    hdr.len = block->binbuf;
525
526    /* Write header */
527    for (int retry=0; retry<=1; retry++) {
528       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
529       if (stat == -1) {
530          berrno be;
531          Jmsg(jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
532               be.bstrerror());
533          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
534       }
535       if (stat != (ssize_t)sizeof(hdr)) {
536          Jmsg(jcr, M_ERROR, 0, _("Error writing header to spool file."
537               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
538               (int)stat, (int)sizeof(hdr));
539          /* If we wrote something, truncate it, then despool */
540          if (stat != -1) {
541 #if defined(HAVE_WIN32)
542             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
543 #else
544             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
545 #endif
546             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
547                berrno be;
548                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
549                   be.bstrerror());
550               /* Note, try continuing despite ftruncate problem */
551             }
552          }
553          if (!despool_data(dcr, false)) {
554             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
555             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
556             return false;
557          }
558          continue;                    /* try again */
559       }
560       return true;
561    }
562    Jmsg(jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
563    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
564    return false;
565 }
566
567 static bool write_spool_data(DCR *dcr)
568 {
569    ssize_t stat;
570    DEV_BLOCK *block = dcr->block;
571    JCR *jcr = dcr->jcr;
572
573    /* Write data */
574    for (int retry=0; retry<=1; retry++) {
575       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
576       if (stat == -1) {
577          berrno be;
578          Jmsg(jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
579               be.bstrerror());
580          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
581       }
582       if (stat != (ssize_t)block->binbuf) {
583          /*
584           * If we wrote something, truncate it and the header, then despool
585           */
586          if (stat != -1) {
587 #if defined(HAVE_WIN32)
588             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
589 #else
590             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
591 #endif
592             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
593                berrno be;
594                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
595                   be.bstrerror());
596                /* Note, try continuing despite ftruncate problem */
597             }
598          }
599          if (!despool_data(dcr, false)) {
600             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
601             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
602             return false;
603          }
604          if (!write_spool_header(dcr)) {
605             return false;
606          }
607          continue;                    /* try again */
608       }
609       return true;
610    }
611    Jmsg(jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
612    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
613    return false;
614 }
615
616
617
618 bool are_attributes_spooled(JCR *jcr)
619 {
620    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
621 }
622
623 /*
624  * Create spool file for attributes.
625  *  This is done by "attaching" to the bsock, and when
626  *  it is called, the output is written to a file.
627  *  The actual spooling is turned on and off in
628  *  append.c only during writing of the attributes.
629  */
630 bool begin_attribute_spool(JCR *jcr)
631 {
632    if (!jcr->no_attributes && jcr->spool_attributes) {
633       return open_attr_spool_file(jcr, jcr->dir_bsock);
634    }
635    return true;
636 }
637
638 bool discard_attribute_spool(JCR *jcr)
639 {
640    if (are_attributes_spooled(jcr)) {
641       return close_attr_spool_file(jcr, jcr->dir_bsock);
642    }
643    return true;
644 }
645
646 static void update_attr_spool_size(ssize_t size)
647 {
648    P(mutex);
649    if (size > 0) {
650      if ((spool_stats.attr_size - size) > 0) {
651         spool_stats.attr_size -= size;
652      } else {
653         spool_stats.attr_size = 0;
654      }
655    }
656    V(mutex);
657 }
658
659 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
660 {
661    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
662       jcr->Job, fd);
663 }
664
665 /*
666  * Tell Director where to find the attributes spool file 
667  *  Note, if we are not on the same machine, the Director will
668  *  return an error, and the higher level routine will transmit
669  *  the data record by record -- using bsock->despool().
670  */
671 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
672 {
673    /* send full spool file name */
674    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
675    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
676    bash_spaces(name);
677    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
678    free_pool_memory(name);
679    
680    if (jcr->dir_bsock->recv() <= 0) {
681       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
682       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
683       return false;
684    }
685    
686    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
687       return false;
688    }
689    return true;
690 }
691
692 bool commit_attribute_spool(JCR *jcr)
693 {
694    boffset_t size, data_end;
695    char ec1[30];
696    char tbuf[100];
697    BSOCK *dir;
698
699    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
700          (utime_t)time(NULL)));
701    if (are_attributes_spooled(jcr)) {
702       dir = jcr->dir_bsock;
703       if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
704          berrno be;
705          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
706               be.bstrerror());
707          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
708          goto bail_out;
709       }
710       size = ftello(dir->m_spool_fd);
711       if (jcr->is_JobStatus(JS_Incomplete)) {
712          data_end = dir->get_data_end();
713          /* Check and truncate to last valid data_end if necssary */
714          if (size > data_end) {
715             if (ftruncate(fileno(dir->m_spool_fd), data_end) != 0) {
716                berrno be;
717                Jmsg(jcr, M_FATAL, 0, _("Truncate on attributes file failed: ERR=%s\n"),
718                     be.bstrerror());
719                jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
720                goto bail_out;
721             }
722             Dmsg2(100, "=== Attrib spool truncated from %lld to %lld\n", 
723                   size, data_end);
724             size = data_end;
725          }
726       }
727       if (size < 0) {
728          berrno be;
729          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
730               be.bstrerror());
731          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
732          goto bail_out;
733       }
734       P(mutex);
735       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
736          spool_stats.max_attr_size = spool_stats.attr_size + size;
737       }
738       spool_stats.attr_size += size;
739       V(mutex);
740       jcr->setJobStatus(JS_AttrDespooling);
741       dir_send_job_status(jcr);
742       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
743             edit_uint64_with_commas(size, ec1));
744
745       if (!blast_attr_spool_file(jcr, size)) {
746          /* Can't read spool file from director side,
747           * send content over network.
748           */
749          dir->despool(update_attr_spool_size, size);
750       }
751       return close_attr_spool_file(jcr, dir);
752    }
753    return true;
754
755 bail_out:
756    close_attr_spool_file(jcr, dir);
757    return false;
758 }
759
760 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
761 {
762    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
763
764    make_unique_spool_filename(jcr, &name, bs->m_fd);
765    bs->m_spool_fd = fopen(name, "w+b");
766    if (!bs->m_spool_fd) {
767       berrno be;
768       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
769            be.bstrerror());
770       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
771       free_pool_memory(name);
772       return false;
773    }
774    P(mutex);
775    spool_stats.attr_jobs++;
776    V(mutex);
777    free_pool_memory(name);
778    return true;
779 }
780
781 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
782 {
783    POOLMEM *name;
784
785    char tbuf[100];
786
787    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
788          (utime_t)time(NULL)));
789    if (!bs->m_spool_fd) {
790       return true;
791    }
792    name = get_pool_memory(PM_MESSAGE);
793    P(mutex);
794    spool_stats.attr_jobs--;
795    spool_stats.total_attr_jobs++;
796    V(mutex);
797    make_unique_spool_filename(jcr, &name, bs->m_fd);
798    fclose(bs->m_spool_fd);
799    unlink(name);
800    free_pool_memory(name);
801    bs->m_spool_fd = NULL;
802    bs->clear_spooling();
803    return true;
804 }