]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
47654eea350adc349246d62d2c7785b513a205b0
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2012 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *  Spooling code
30  *
31  *      Kern Sibbald, March 2004
32  *
33  */
34
35 #include "bacula.h"
36 #include "stored.h"
37
38 /* Forward referenced subroutines */
39 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
40 static bool open_data_spool_file(DCR *dcr);
41 static bool close_data_spool_file(DCR *dcr);
42 static bool despool_data(DCR *dcr, bool commit);
43 static int  read_block_from_spool_file(DCR *dcr);
44 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
45 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool write_spool_header(DCR *dcr);
47 static bool write_spool_data(DCR *dcr);
48
49 struct spool_stats_t {
50    uint32_t data_jobs;                /* current jobs spooling data */
51    uint32_t attr_jobs;
52    uint32_t total_data_jobs;          /* total jobs to have spooled data */
53    uint32_t total_attr_jobs;
54    int64_t max_data_size;             /* max data size */
55    int64_t max_attr_size;
56    int64_t data_size;                 /* current data size (all jobs running) */
57    int64_t attr_size;
58 };
59
60 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61 spool_stats_t spool_stats;
62
63 /*
64  * Header for data spool record */
65 struct spool_hdr {
66    int32_t  FirstIndex;               /* FirstIndex for buffer */
67    int32_t  LastIndex;                /* LastIndex for buffer */
68    uint32_t len;                      /* length of next buffer */
69 };
70
71 enum {
72    RB_EOT = 1,
73    RB_ERROR,
74    RB_OK
75 };
76
77 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
78 {
79    char ed1[30], ed2[30];
80    POOL_MEM msg(PM_MESSAGE);
81    int len;
82
83    len = Mmsg(msg, _("Spooling statistics:\n"));
84
85    if (spool_stats.data_jobs || spool_stats.max_data_size) {
86       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
87          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
88          spool_stats.total_data_jobs,
89          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
90
91       sendit(msg.c_str(), len, arg);
92    }
93    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
94       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
95          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
96          spool_stats.total_attr_jobs,
97          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
98    
99       sendit(msg.c_str(), len, arg);
100    }
101 }
102
103 bool begin_data_spool(DCR *dcr)
104 {
105    bool stat = true;
106    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
107       Dmsg0(100, "Turning on data spooling\n");
108       dcr->spool_data = true;
109       stat = open_data_spool_file(dcr);
110       if (stat) {
111          dcr->spooling = true;
112          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
113          P(mutex);
114          spool_stats.data_jobs++;
115          V(mutex);
116       }
117    }
118    return stat;
119 }
120
121 bool discard_data_spool(DCR *dcr)
122 {
123    if (dcr->spooling) {
124       Dmsg0(100, "Data spooling discarded\n");
125       return close_data_spool_file(dcr);
126    }
127    return true;
128 }
129
130 bool commit_data_spool(DCR *dcr)
131 {
132    bool stat;
133
134    if (dcr->spooling) {
135       Dmsg0(100, "Committing spooled data\n");
136       stat = despool_data(dcr, true /*commit*/);
137       if (!stat) {
138          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
139          close_data_spool_file(dcr);
140          return false;
141       }
142       return close_data_spool_file(dcr);
143    }
144    return true;
145 }
146
147 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
148 {
149    const char *dir;
150    if (dcr->dev->device->spool_directory) {
151       dir = dcr->dev->device->spool_directory;
152    } else {
153       dir = working_directory;
154    }
155    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
156         dcr->jcr->Job, dcr->device->hdr.name);
157 }
158
159
160 static bool open_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163    int spool_fd;
164
165    make_unique_data_spool_filename(dcr, &name);
166    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
167       dcr->spool_fd = spool_fd;
168       dcr->jcr->spool_attributes = true;
169    } else {
170       berrno be;
171       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
172            be.bstrerror());
173       free_pool_memory(name);
174       return false;
175    }
176    Dmsg1(100, "Created spool file: %s\n", name);
177    free_pool_memory(name);
178    return true;
179 }
180
181 static bool close_data_spool_file(DCR *dcr)
182 {
183    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
184
185    P(mutex);
186    spool_stats.data_jobs--;
187    spool_stats.total_data_jobs++;
188    if (spool_stats.data_size < dcr->job_spool_size) {
189       spool_stats.data_size = 0;
190    } else {
191       spool_stats.data_size -= dcr->job_spool_size;
192    }
193    dcr->job_spool_size = 0;
194    V(mutex);
195
196    make_unique_data_spool_filename(dcr, &name);
197    close(dcr->spool_fd);
198    dcr->spool_fd = -1;
199    dcr->spooling = false;
200    unlink(name);
201    Dmsg1(100, "Deleted spool file: %s\n", name);
202    free_pool_memory(name);
203    return true;
204 }
205
206 static const char *spool_name = "*spool*";
207
208 /*
209  * NB! This routine locks the device, but if committing will
210  *     not unlock it. If not committing, it will be unlocked.
211  */
212 static bool despool_data(DCR *dcr, bool commit)
213 {
214    DEVICE *rdev;
215    DCR *rdcr;
216    bool ok = true;
217    DEV_BLOCK *block;
218    JCR *jcr = dcr->jcr;
219    int stat;
220    char ec1[50];
221    BSOCK *dir = jcr->dir_bsock;
222
223    Dmsg0(100, "Despooling data\n");
224    if (jcr->dcr->job_spool_size == 0) {
225       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
226    }
227
228    /*
229     * Commit means that the job is done, so we commit, otherwise, we
230     *  are despooling because of user spool size max or some error  
231     *  (e.g. filesystem full).
232     */
233    if (commit) {
234       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
235          jcr->dcr->VolumeName,
236          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237       jcr->setJobStatus(JS_DataCommitting);
238    } else {
239       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
240          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
241       jcr->setJobStatus(JS_DataDespooling);
242    }
243    jcr->sendJobStatus(JS_DataDespooling);
244    dcr->despool_wait = true;
245    dcr->spooling = false;
246    /*
247     * We work with device blocked, but not locked so that
248     *  other threads -- e.g. reservations can lock the device
249     *  structure.
250     */
251    dcr->dblock(BST_DESPOOLING);
252    dcr->despool_wait = false;
253    dcr->despooling = true;
254
255    /*
256     * This is really quite kludgy and should be fixed some time.
257     * We create a dev structure to read from the spool file
258     * in rdev and rdcr.
259     */
260    rdev = (DEVICE *)malloc(sizeof(DEVICE));
261    memset(rdev, 0, sizeof(DEVICE));
262    rdev->dev_name = get_memory(strlen(spool_name)+1);
263    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
264    rdev->errmsg = get_pool_memory(PM_EMSG);
265    *rdev->errmsg = 0;
266    rdev->max_block_size = dcr->dev->max_block_size;
267    rdev->min_block_size = dcr->dev->min_block_size;
268    rdev->device = dcr->dev->device;
269    rdcr = new_dcr(jcr, NULL, rdev);
270    rdcr->spool_fd = dcr->spool_fd;
271    block = dcr->block;                /* save block */
272    dcr->block = rdcr->block;          /* make read and write block the same */
273
274    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
275    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
276
277 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
278    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
279 #endif
280
281    /* Add run time, to get current wait time */
282    int32_t despool_start = time(NULL) - jcr->run_time;
283
284    set_new_file_parameters(dcr);
285
286    for ( ; ok; ) {
287       if (job_canceled(jcr)) {
288          ok = false;
289          break;
290       }
291       stat = read_block_from_spool_file(rdcr);
292       if (stat == RB_EOT) {
293          break;
294       } else if (stat == RB_ERROR) {
295          ok = false;
296          break;
297       }
298       ok = dcr->write_block_to_device();
299       if (!ok) {
300          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
301                dcr->dev->print_name(), dcr->dev->bstrerror());
302          Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
303                dcr->dev->print_name(), dcr->dev->bstrerror());
304          /* Force in case Incomplete set */
305          jcr->forceJobStatus(JS_FatalError);
306       }
307       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
308    }
309
310    /*
311     * If this Job is incomplete, we need to backup the FileIndex
312     *  to the last correctly saved file so that the JobMedia
313     *  LastIndex is correct.
314     */
315    if (jcr->is_JobStatus(JS_Incomplete)) {
316       dcr->VolLastIndex = dir->get_FileIndex();
317       Dmsg1(100, "======= Set FI=%ld\n", dir->get_FileIndex());
318    }
319
320    if (!dir_create_jobmedia_record(dcr)) {
321       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
322          dcr->getVolCatName(), jcr->Job);
323       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
324    }
325    /* Set new file/block parameters for current dcr */
326    set_new_file_parameters(dcr);
327
328    /*
329     * Subtracting run_time give us elapsed time - wait_time since 
330     * we started despooling. Note, don't use time_t as it is 32 or 64
331     * bits depending on the OS and doesn't edit with %d
332     */
333    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
334
335    if (despool_elapsed <= 0) {
336       despool_elapsed = 1;
337    }
338
339    Jmsg(jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
340          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
341          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
342
343    dcr->block = block;                /* reset block */
344
345    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
346    if (ftruncate(rdcr->spool_fd, 0) != 0) {
347       berrno be;
348       Jmsg(jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
349          be.bstrerror());
350       /* Note, try continuing despite ftruncate problem */
351    }
352
353    P(mutex);
354    if (spool_stats.data_size < dcr->job_spool_size) {
355       spool_stats.data_size = 0;
356    } else {
357       spool_stats.data_size -= dcr->job_spool_size;
358    }
359    V(mutex);
360    P(dcr->dev->spool_mutex);
361    dcr->dev->spool_size -= dcr->job_spool_size;
362    dcr->job_spool_size = 0;            /* zap size in input dcr */
363    V(dcr->dev->spool_mutex);
364    free_memory(rdev->dev_name);
365    free_pool_memory(rdev->errmsg);
366    /* Be careful to NULL the jcr and free rdev after free_dcr() */
367    rdcr->jcr = NULL;
368    rdcr->set_dev(NULL);
369    free_dcr(rdcr);
370    free(rdev);
371    dcr->spooling = true;           /* turn on spooling again */
372    dcr->despooling = false;
373    /*
374     * Note, if committing we leave the device blocked. It will be removed in
375     *  release_device();
376     */
377    if (!commit) {
378       dcr->dev->dunblock();
379    }
380    jcr->sendJobStatus(JS_Running);
381    return ok;
382 }
383
384 /*
385  * Read a block from the spool file
386  *
387  *  Returns RB_OK on success
388  *          RB_EOT when file done
389  *          RB_ERROR on error
390  */
391 static int read_block_from_spool_file(DCR *dcr)
392 {
393    uint32_t rlen;
394    ssize_t stat;
395    spool_hdr hdr;
396    DEV_BLOCK *block = dcr->block;
397    JCR *jcr = dcr->jcr;
398
399    rlen = sizeof(hdr);
400    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
401    if (stat == 0) {
402       Dmsg0(100, "EOT on spool read.\n");
403       return RB_EOT;
404    } else if (stat != (ssize_t)rlen) {
405       if (stat == -1) {
406          berrno be;
407          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
408               be.bstrerror());
409       } else {
410          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
411          Jmsg2(jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
412       }
413       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
414       return RB_ERROR;
415    }
416    rlen = hdr.len;
417    if (rlen > block->buf_len) {
418       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
419       Jmsg2(jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
420       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
421       return RB_ERROR;
422    }
423    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
424    if (stat != (ssize_t)rlen) {
425       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
426       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
427       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
428       return RB_ERROR;
429    }
430    /* Setup write pointers */
431    block->binbuf = rlen;
432    block->bufp = block->buf + block->binbuf;
433    block->FirstIndex = hdr.FirstIndex;
434    block->LastIndex = hdr.LastIndex;
435    block->VolSessionId = dcr->jcr->VolSessionId;
436    block->VolSessionTime = dcr->jcr->VolSessionTime;
437    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
438    return RB_OK;
439 }
440
441 /*
442  * Write a block to the spool file
443  *
444  *  Returns: true on success or EOT
445  *           false on hard error
446  */
447 bool write_block_to_spool_file(DCR *dcr)
448 {
449    uint32_t wlen, hlen;               /* length to write */
450    bool despool = false;
451    DEV_BLOCK *block = dcr->block;
452
453    if (job_canceled(dcr->jcr)) {
454       return false;
455    }
456    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
457    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
458       return true;
459    }
460
461    hlen = sizeof(spool_hdr);
462    wlen = block->binbuf;
463    P(dcr->dev->spool_mutex);
464    dcr->job_spool_size += hlen + wlen;
465    dcr->dev->spool_size += hlen + wlen;
466    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
467        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
468       despool = true;
469    }
470    V(dcr->dev->spool_mutex);
471    P(mutex);
472    spool_stats.data_size += hlen + wlen;
473    if (spool_stats.data_size > spool_stats.max_data_size) {
474       spool_stats.max_data_size = spool_stats.data_size;
475    }
476    V(mutex);
477    if (despool) {
478 #ifdef xDEBUG
479       char ec1[30], ec2[30], ec3[30], ec4[30];
480       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
481             "max_job_size=%s job_size=%s\n",
482             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
483             edit_uint64_with_commas(dcr->job_spool_size, ec2),
484             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
485             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
486 #endif
487       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
488       if (!despool_data(dcr, false)) {
489          Pmsg0(000, _("Bad return from despool in write_block.\n"));
490          return false;
491       }
492       /* Despooling cleared these variables so reset them */
493       P(dcr->dev->spool_mutex);
494       dcr->job_spool_size += hlen + wlen;
495       dcr->dev->spool_size += hlen + wlen;
496       V(dcr->dev->spool_mutex);
497       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
498    }
499
500
501    if (!write_spool_header(dcr)) {
502       return false;
503    }
504    if (!write_spool_data(dcr)) {
505      return false;
506    }
507
508    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
509    empty_block(block);
510    return true;
511 }
512
513 static bool write_spool_header(DCR *dcr)
514 {
515    spool_hdr hdr;
516    ssize_t stat;
517    DEV_BLOCK *block = dcr->block;
518    JCR *jcr = dcr->jcr;
519
520    hdr.FirstIndex = block->FirstIndex;
521    hdr.LastIndex = block->LastIndex;
522    hdr.len = block->binbuf;
523
524    /* Write header */
525    for (int retry=0; retry<=1; retry++) {
526       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
527       if (stat == -1) {
528          berrno be;
529          Jmsg(jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
530               be.bstrerror());
531          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
532       }
533       if (stat != (ssize_t)sizeof(hdr)) {
534          Jmsg(jcr, M_ERROR, 0, _("Error writing header to spool file."
535               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
536               (int)stat, (int)sizeof(hdr));
537          /* If we wrote something, truncate it, then despool */
538          if (stat != -1) {
539 #if defined(HAVE_WIN32)
540             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
541 #else
542             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
543 #endif
544             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
545                berrno be;
546                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
547                   be.bstrerror());
548               /* Note, try continuing despite ftruncate problem */
549             }
550          }
551          if (!despool_data(dcr, false)) {
552             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
553             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
554             return false;
555          }
556          continue;                    /* try again */
557       }
558       return true;
559    }
560    Jmsg(jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
561    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
562    return false;
563 }
564
565 static bool write_spool_data(DCR *dcr)
566 {
567    ssize_t stat;
568    DEV_BLOCK *block = dcr->block;
569    JCR *jcr = dcr->jcr;
570
571    /* Write data */
572    for (int retry=0; retry<=1; retry++) {
573       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
574       if (stat == -1) {
575          berrno be;
576          Jmsg(jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
577               be.bstrerror());
578          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
579       }
580       if (stat != (ssize_t)block->binbuf) {
581          /*
582           * If we wrote something, truncate it and the header, then despool
583           */
584          if (stat != -1) {
585 #if defined(HAVE_WIN32)
586             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
587 #else
588             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
589 #endif
590             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
591                berrno be;
592                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
593                   be.bstrerror());
594                /* Note, try continuing despite ftruncate problem */
595             }
596          }
597          if (!despool_data(dcr, false)) {
598             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
599             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
600             return false;
601          }
602          if (!write_spool_header(dcr)) {
603             return false;
604          }
605          continue;                    /* try again */
606       }
607       return true;
608    }
609    Jmsg(jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
610    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
611    return false;
612 }
613
614
615
616 bool are_attributes_spooled(JCR *jcr)
617 {
618    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
619 }
620
621 /*
622  * Create spool file for attributes.
623  *  This is done by "attaching" to the bsock, and when
624  *  it is called, the output is written to a file.
625  *  The actual spooling is turned on and off in
626  *  append.c only during writing of the attributes.
627  */
628 bool begin_attribute_spool(JCR *jcr)
629 {
630    if (!jcr->no_attributes && jcr->spool_attributes) {
631       return open_attr_spool_file(jcr, jcr->dir_bsock);
632    }
633    return true;
634 }
635
636 bool discard_attribute_spool(JCR *jcr)
637 {
638    if (are_attributes_spooled(jcr)) {
639       return close_attr_spool_file(jcr, jcr->dir_bsock);
640    }
641    return true;
642 }
643
644 static void update_attr_spool_size(ssize_t size)
645 {
646    P(mutex);
647    if (size > 0) {
648      if ((spool_stats.attr_size - size) > 0) {
649         spool_stats.attr_size -= size;
650      } else {
651         spool_stats.attr_size = 0;
652      }
653    }
654    V(mutex);
655 }
656
657 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
658 {
659    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
660       jcr->Job, fd);
661 }
662
663 /*
664  * Tell Director where to find the attributes spool file 
665  *  Note, if we are not on the same machine, the Director will
666  *  return an error, and the higher level routine will transmit
667  *  the data record by record -- using bsock->despool().
668  */
669 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
670 {
671    /* send full spool file name */
672    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
673    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
674    bash_spaces(name);
675    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
676    free_pool_memory(name);
677    
678    if (jcr->dir_bsock->recv() <= 0) {
679       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
680       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
681       return false;
682    }
683    
684    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
685       return false;
686    }
687    return true;
688 }
689
690 bool commit_attribute_spool(JCR *jcr)
691 {
692    boffset_t size, data_end;
693    char ec1[30];
694    char tbuf[100];
695    BSOCK *dir;
696
697    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
698          (utime_t)time(NULL)));
699    if (are_attributes_spooled(jcr)) {
700       dir = jcr->dir_bsock;
701       if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
702          berrno be;
703          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
704               be.bstrerror());
705          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
706          goto bail_out;
707       }
708       size = ftello(dir->m_spool_fd);
709       if (jcr->is_JobStatus(JS_Incomplete)) {
710          data_end = dir->get_data_end();
711          /* Check and truncate to last valid data_end if necssary */
712          if (size > data_end) {
713             if (ftruncate(fileno(dir->m_spool_fd), data_end) != 0) {
714                berrno be;
715                Jmsg(jcr, M_FATAL, 0, _("Truncate on attributes file failed: ERR=%s\n"),
716                     be.bstrerror());
717                jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
718                goto bail_out;
719             }
720             Dmsg2(100, "=== Attrib spool truncated from %lld to %lld\n", 
721                   size, data_end);
722             size = data_end;
723          }
724       }
725       if (size < 0) {
726          berrno be;
727          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
728               be.bstrerror());
729          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
730          goto bail_out;
731       }
732       P(mutex);
733       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
734          spool_stats.max_attr_size = spool_stats.attr_size + size;
735       }
736       spool_stats.attr_size += size;
737       V(mutex);
738       jcr->sendJobStatus(JS_AttrDespooling);
739       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
740             edit_uint64_with_commas(size, ec1));
741
742       if (!blast_attr_spool_file(jcr, size)) {
743          /* Can't read spool file from director side,
744           * send content over network.
745           */
746          dir->despool(update_attr_spool_size, size);
747       }
748       return close_attr_spool_file(jcr, dir);
749    }
750    return true;
751
752 bail_out:
753    close_attr_spool_file(jcr, dir);
754    return false;
755 }
756
757 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
758 {
759    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
760
761    make_unique_spool_filename(jcr, &name, bs->m_fd);
762    bs->m_spool_fd = fopen(name, "w+b");
763    if (!bs->m_spool_fd) {
764       berrno be;
765       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
766            be.bstrerror());
767       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
768       free_pool_memory(name);
769       return false;
770    }
771    P(mutex);
772    spool_stats.attr_jobs++;
773    V(mutex);
774    free_pool_memory(name);
775    return true;
776 }
777
778 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
779 {
780    POOLMEM *name;
781
782    char tbuf[100];
783
784    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
785          (utime_t)time(NULL)));
786    if (!bs->m_spool_fd) {
787       return true;
788    }
789    name = get_pool_memory(PM_MESSAGE);
790    P(mutex);
791    spool_stats.attr_jobs--;
792    spool_stats.total_attr_jobs++;
793    V(mutex);
794    make_unique_spool_filename(jcr, &name, bs->m_fd);
795    fclose(bs->m_spool_fd);
796    unlink(name);
797    free_pool_memory(name);
798    bs->m_spool_fd = NULL;
799    bs->clear_spooling();
800    return true;
801 }