]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
395bd48c382b91d4a752d80d4f9dd8bc372e4a62
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2012 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *  Spooling code
30  *
31  *      Kern Sibbald, March 2004
32  *
33  */
34
35 #include "bacula.h"
36 #include "stored.h"
37
38 /* Forward referenced subroutines */
39 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
40 static bool open_data_spool_file(DCR *dcr);
41 static bool close_data_spool_file(DCR *dcr);
42 static bool despool_data(DCR *dcr, bool commit);
43 static int  read_block_from_spool_file(DCR *dcr);
44 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
45 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool write_spool_header(DCR *dcr);
47 static bool write_spool_data(DCR *dcr);
48
49 struct spool_stats_t {
50    uint32_t data_jobs;                /* current jobs spooling data */
51    uint32_t attr_jobs;
52    uint32_t total_data_jobs;          /* total jobs to have spooled data */
53    uint32_t total_attr_jobs;
54    int64_t max_data_size;             /* max data size */
55    int64_t max_attr_size;
56    int64_t data_size;                 /* current data size (all jobs running) */
57    int64_t attr_size;
58 };
59
60 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61 spool_stats_t spool_stats;
62
63 /*
64  * Header for data spool record */
65 struct spool_hdr {
66    int32_t  FirstIndex;               /* FirstIndex for buffer */
67    int32_t  LastIndex;                /* LastIndex for buffer */
68    uint32_t len;                      /* length of next buffer */
69 };
70
71 enum {
72    RB_EOT = 1,
73    RB_ERROR,
74    RB_OK
75 };
76
77 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
78 {
79    char ed1[30], ed2[30];
80    POOL_MEM msg(PM_MESSAGE);
81    int len;
82
83    len = Mmsg(msg, _("Spooling statistics:\n"));
84
85    if (spool_stats.data_jobs || spool_stats.max_data_size) {
86       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
87          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
88          spool_stats.total_data_jobs,
89          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
90
91       sendit(msg.c_str(), len, arg);
92    }
93    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
94       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
95          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
96          spool_stats.total_attr_jobs,
97          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
98    
99       sendit(msg.c_str(), len, arg);
100    }
101 }
102
103 bool begin_data_spool(DCR *dcr)
104 {
105    bool stat = true;
106    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
107       Dmsg0(100, "Turning on data spooling\n");
108       dcr->spool_data = true;
109       stat = open_data_spool_file(dcr);
110       if (stat) {
111          dcr->spooling = true;
112          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
113          P(mutex);
114          spool_stats.data_jobs++;
115          V(mutex);
116       }
117    }
118    return stat;
119 }
120
121 bool discard_data_spool(DCR *dcr)
122 {
123    if (dcr->spooling) {
124       Dmsg0(100, "Data spooling discarded\n");
125       return close_data_spool_file(dcr);
126    }
127    return true;
128 }
129
130 bool commit_data_spool(DCR *dcr)
131 {
132    bool stat;
133
134    if (dcr->spooling) {
135       Dmsg0(100, "Committing spooled data\n");
136       stat = despool_data(dcr, true /*commit*/);
137       if (!stat) {
138          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
139          close_data_spool_file(dcr);
140          return false;
141       }
142       return close_data_spool_file(dcr);
143    }
144    return true;
145 }
146
147 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
148 {
149    const char *dir;
150    if (dcr->dev->device->spool_directory) {
151       dir = dcr->dev->device->spool_directory;
152    } else {
153       dir = working_directory;
154    }
155    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
156         dcr->jcr->Job, dcr->device->hdr.name);
157 }
158
159
160 static bool open_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163    int spool_fd;
164
165    make_unique_data_spool_filename(dcr, &name);
166    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
167       dcr->spool_fd = spool_fd;
168       dcr->jcr->spool_attributes = true;
169    } else {
170       berrno be;
171       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
172            be.bstrerror());
173       free_pool_memory(name);
174       return false;
175    }
176    Dmsg1(100, "Created spool file: %s\n", name);
177    free_pool_memory(name);
178    return true;
179 }
180
181 static bool close_data_spool_file(DCR *dcr)
182 {
183    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
184
185    P(mutex);
186    spool_stats.data_jobs--;
187    spool_stats.total_data_jobs++;
188    if (spool_stats.data_size < dcr->job_spool_size) {
189       spool_stats.data_size = 0;
190    } else {
191       spool_stats.data_size -= dcr->job_spool_size;
192    }
193    dcr->job_spool_size = 0;
194    V(mutex);
195
196    make_unique_data_spool_filename(dcr, &name);
197    close(dcr->spool_fd);
198    dcr->spool_fd = -1;
199    dcr->spooling = false;
200    unlink(name);
201    Dmsg1(100, "Deleted spool file: %s\n", name);
202    free_pool_memory(name);
203    return true;
204 }
205
206 static const char *spool_name = "*spool*";
207
208 /*
209  * NB! This routine locks the device, but if committing will
210  *     not unlock it. If not committing, it will be unlocked.
211  */
212 static bool despool_data(DCR *dcr, bool commit)
213 {
214    DEVICE *rdev;
215    DCR *rdcr;
216    bool ok = true;
217    DEV_BLOCK *block;
218    JCR *jcr = dcr->jcr;
219    int stat;
220    char ec1[50];
221    BSOCK *dir = jcr->dir_bsock;
222
223    Dmsg0(100, "Despooling data\n");
224    if (jcr->dcr->job_spool_size == 0) {
225       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
226    }
227
228    /*
229     * Commit means that the job is done, so we commit, otherwise, we
230     *  are despooling because of user spool size max or some error  
231     *  (e.g. filesystem full).
232     */
233    if (commit) {
234       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
235          jcr->dcr->VolumeName,
236          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237       jcr->setJobStatus(JS_DataCommitting);
238    } else {
239       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
240          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
241       jcr->setJobStatus(JS_DataDespooling);
242    }
243    jcr->sendJobStatus(JS_DataDespooling);
244    dcr->despool_wait = true;
245    dcr->spooling = false;
246    /*
247     * We work with device blocked, but not locked so that
248     *  other threads -- e.g. reservations can lock the device
249     *  structure.
250     */
251    dcr->dblock(BST_DESPOOLING);
252    dcr->despool_wait = false;
253    dcr->despooling = true;
254
255    /*
256     * This is really quite kludgy and should be fixed some time.
257     * We create a dev structure to read from the spool file
258     * in rdev and rdcr.
259     */
260    rdev = (DEVICE *)malloc(sizeof(DEVICE));
261    memset(rdev, 0, sizeof(DEVICE));
262    rdev->dev_name = get_memory(strlen(spool_name)+1);
263    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
264    rdev->errmsg = get_pool_memory(PM_EMSG);
265    *rdev->errmsg = 0;
266    rdev->max_block_size = dcr->dev->max_block_size;
267    rdev->min_block_size = dcr->dev->min_block_size;
268    rdev->device = dcr->dev->device;
269    rdcr = new_dcr(jcr, NULL, rdev);
270    rdcr->spool_fd = dcr->spool_fd;
271    block = dcr->block;                /* save block */
272    dcr->block = rdcr->block;          /* make read and write block the same */
273
274    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
275    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
276
277 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
278    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
279 #endif
280
281    /* Add run time, to get current wait time */
282    int32_t despool_start = time(NULL) - jcr->run_time;
283
284    set_new_file_parameters(dcr);
285
286    for ( ; ok; ) {
287       if (job_canceled(jcr)) {
288          ok = false;
289          break;
290       }
291       stat = read_block_from_spool_file(rdcr);
292       if (stat == RB_EOT) {
293          break;
294       } else if (stat == RB_ERROR) {
295          ok = false;
296          break;
297       }
298       ok = dcr->write_block_to_device();
299       if (!ok) {
300          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
301                dcr->dev->print_name(), dcr->dev->bstrerror());
302          Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
303                dcr->dev->print_name(), dcr->dev->bstrerror());
304          /* Force in case Incomplete set */
305          jcr->forceJobStatus(JS_FatalError);
306       }
307       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
308    }
309
310    /*
311     * If this Job is incomplete, we need to backup the FileIndex
312     *  to the last correctly saved file so that the JobMedia
313     *  LastIndex is correct.
314     */
315    if (jcr->is_JobStatus(JS_Incomplete)) {
316       dcr->VolLastIndex = dir->get_FileIndex();
317       Dmsg1(100, "======= Set FI=%ld\n", dir->get_FileIndex());
318    }
319
320    if (!dir_create_jobmedia_record(dcr)) {
321       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
322          dcr->getVolCatName(), jcr->Job);
323       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
324    }
325    /* Set new file/block parameters for current dcr */
326    set_new_file_parameters(dcr);
327
328    /*
329     * Subtracting run_time give us elapsed time - wait_time since 
330     * we started despooling. Note, don't use time_t as it is 32 or 64
331     * bits depending on the OS and doesn't edit with %d
332     */
333    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
334
335    if (despool_elapsed <= 0) {
336       despool_elapsed = 1;
337    }
338
339    Jmsg(jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
340          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
341          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
342
343    dcr->block = block;                /* reset block */
344
345    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
346    if (ftruncate(rdcr->spool_fd, 0) != 0) {
347       berrno be;
348       Jmsg(jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
349          be.bstrerror());
350       /* Note, try continuing despite ftruncate problem */
351    }
352
353    P(mutex);
354    if (spool_stats.data_size < dcr->job_spool_size) {
355       spool_stats.data_size = 0;
356    } else {
357       spool_stats.data_size -= dcr->job_spool_size;
358    }
359    V(mutex);
360    P(dcr->dev->spool_mutex);
361    dcr->dev->spool_size -= dcr->job_spool_size;
362    dcr->job_spool_size = 0;            /* zap size in input dcr */
363    V(dcr->dev->spool_mutex);
364    free_memory(rdev->dev_name);
365    free_pool_memory(rdev->errmsg);
366    /* Be careful to NULL the jcr and free rdev after free_dcr() */
367    rdcr->jcr = NULL;
368    rdcr->set_dev(NULL);
369    free_dcr(rdcr);
370    free(rdev);
371    dcr->spooling = true;           /* turn on spooling again */
372    dcr->despooling = false;
373    /*
374     * Note, if committing we leave the device blocked. It will be removed in
375     *  release_device();
376     */
377    if (!commit) {
378       dcr->dev->dunblock();
379    }
380    jcr->sendJobStatus(JS_Running);
381    return ok;
382 }
383
384 /*
385  * Read a block from the spool file
386  *
387  *  Returns RB_OK on success
388  *          RB_EOT when file done
389  *          RB_ERROR on error
390  */
391 static int read_block_from_spool_file(DCR *dcr)
392 {
393    uint32_t rlen;
394    ssize_t stat;
395    spool_hdr hdr;
396    DEV_BLOCK *block = dcr->block;
397    JCR *jcr = dcr->jcr;
398
399    rlen = sizeof(hdr);
400    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
401    if (stat == 0) {
402       Dmsg0(100, "EOT on spool read.\n");
403       return RB_EOT;
404    } else if (stat != (ssize_t)rlen) {
405       if (stat == -1) {
406          berrno be;
407          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
408               be.bstrerror());
409       } else {
410          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
411          Jmsg2(jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
412       }
413       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
414       return RB_ERROR;
415    }
416    rlen = hdr.len;
417    if (rlen > block->buf_len) {
418       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
419       Jmsg2(jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
420       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
421       return RB_ERROR;
422    }
423    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
424    if (stat != (ssize_t)rlen) {
425       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
426       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
427       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
428       return RB_ERROR;
429    }
430    /* Setup write pointers */
431    block->binbuf = rlen;
432    block->bufp = block->buf + block->binbuf;
433    block->FirstIndex = hdr.FirstIndex;
434    block->LastIndex = hdr.LastIndex;
435    block->VolSessionId = dcr->jcr->VolSessionId;
436    block->VolSessionTime = dcr->jcr->VolSessionTime;
437    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
438    return RB_OK;
439 }
440
441 /*
442  * Write a block to the spool file
443  *
444  *  Returns: true on success or EOT
445  *           false on hard error
446  */
447 bool write_block_to_spool_file(DCR *dcr)
448 {
449    uint32_t wlen, hlen;               /* length to write */
450    bool despool = false;
451    DEV_BLOCK *block = dcr->block;
452
453    if (job_canceled(dcr->jcr)) {
454       return false;
455    }
456    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
457    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
458       return true;
459    }
460
461    hlen = sizeof(spool_hdr);
462    wlen = block->binbuf;
463    P(dcr->dev->spool_mutex);
464    dcr->job_spool_size += hlen + wlen;
465    dcr->dev->spool_size += hlen + wlen;
466    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
467        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
468       despool = true;
469    }
470    V(dcr->dev->spool_mutex);
471    P(mutex);
472    spool_stats.data_size += hlen + wlen;
473    if (spool_stats.data_size > spool_stats.max_data_size) {
474       spool_stats.max_data_size = spool_stats.data_size;
475    }
476    V(mutex);
477    if (despool) {
478       char ec1[30], ec2[30], ec3[30], ec4[30];
479       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached: "
480          "JobSpoolSize=%s MaxJobSpool=%s DevSpoolSize=%s MaxDevSpool=%s\n"),
481          edit_uint64_with_commas(dcr->job_spool_size, ec1),
482          edit_uint64_with_commas(dcr->max_job_spool_size, ec2),
483          edit_uint64_with_commas(dcr->dev->spool_size, ec3),
484          edit_uint64_with_commas(dcr->dev->max_spool_size, ec4));
485
486       if (!despool_data(dcr, false)) {
487          Pmsg0(000, _("Bad return from despool in write_block.\n"));
488          return false;
489       }
490       /* Despooling cleared these variables so reset them */
491       P(dcr->dev->spool_mutex);
492       dcr->job_spool_size += hlen + wlen;
493       dcr->dev->spool_size += hlen + wlen;
494       V(dcr->dev->spool_mutex);
495       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
496    }
497
498
499    if (!write_spool_header(dcr)) {
500       return false;
501    }
502    if (!write_spool_data(dcr)) {
503      return false;
504    }
505
506    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
507    empty_block(block);
508    return true;
509 }
510
511 static bool write_spool_header(DCR *dcr)
512 {
513    spool_hdr hdr;
514    ssize_t stat;
515    DEV_BLOCK *block = dcr->block;
516    JCR *jcr = dcr->jcr;
517
518    hdr.FirstIndex = block->FirstIndex;
519    hdr.LastIndex = block->LastIndex;
520    hdr.len = block->binbuf;
521
522    /* Write header */
523    for (int retry=0; retry<=1; retry++) {
524       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
525       if (stat == -1) {
526          berrno be;
527          Jmsg(jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
528               be.bstrerror());
529          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
530       }
531       if (stat != (ssize_t)sizeof(hdr)) {
532          Jmsg(jcr, M_ERROR, 0, _("Error writing header to spool file."
533               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
534               (int)stat, (int)sizeof(hdr));
535          /* If we wrote something, truncate it, then despool */
536          if (stat != -1) {
537 #if defined(HAVE_WIN32)
538             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
539 #else
540             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
541 #endif
542             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
543                berrno be;
544                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
545                   be.bstrerror());
546               /* Note, try continuing despite ftruncate problem */
547             }
548          }
549          if (!despool_data(dcr, false)) {
550             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
551             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
552             return false;
553          }
554          continue;                    /* try again */
555       }
556       return true;
557    }
558    Jmsg(jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
559    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
560    return false;
561 }
562
563 static bool write_spool_data(DCR *dcr)
564 {
565    ssize_t stat;
566    DEV_BLOCK *block = dcr->block;
567    JCR *jcr = dcr->jcr;
568
569    /* Write data */
570    for (int retry=0; retry<=1; retry++) {
571       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
572       if (stat == -1) {
573          berrno be;
574          Jmsg(jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
575               be.bstrerror());
576          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
577       }
578       if (stat != (ssize_t)block->binbuf) {
579          /*
580           * If we wrote something, truncate it and the header, then despool
581           */
582          if (stat != -1) {
583 #if defined(HAVE_WIN32)
584             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
585 #else
586             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
587 #endif
588             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
589                berrno be;
590                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
591                   be.bstrerror());
592                /* Note, try continuing despite ftruncate problem */
593             }
594          }
595          if (!despool_data(dcr, false)) {
596             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
597             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
598             return false;
599          }
600          if (!write_spool_header(dcr)) {
601             return false;
602          }
603          continue;                    /* try again */
604       }
605       return true;
606    }
607    Jmsg(jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
608    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
609    return false;
610 }
611
612
613
614 bool are_attributes_spooled(JCR *jcr)
615 {
616    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
617 }
618
619 /*
620  * Create spool file for attributes.
621  *  This is done by "attaching" to the bsock, and when
622  *  it is called, the output is written to a file.
623  *  The actual spooling is turned on and off in
624  *  append.c only during writing of the attributes.
625  */
626 bool begin_attribute_spool(JCR *jcr)
627 {
628    if (!jcr->no_attributes && jcr->spool_attributes) {
629       return open_attr_spool_file(jcr, jcr->dir_bsock);
630    }
631    return true;
632 }
633
634 bool discard_attribute_spool(JCR *jcr)
635 {
636    if (are_attributes_spooled(jcr)) {
637       return close_attr_spool_file(jcr, jcr->dir_bsock);
638    }
639    return true;
640 }
641
642 static void update_attr_spool_size(ssize_t size)
643 {
644    P(mutex);
645    if (size > 0) {
646      if ((spool_stats.attr_size - size) > 0) {
647         spool_stats.attr_size -= size;
648      } else {
649         spool_stats.attr_size = 0;
650      }
651    }
652    V(mutex);
653 }
654
655 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
656 {
657    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
658       jcr->Job, fd);
659 }
660
661 /*
662  * Tell Director where to find the attributes spool file 
663  *  Note, if we are not on the same machine, the Director will
664  *  return an error, and the higher level routine will transmit
665  *  the data record by record -- using bsock->despool().
666  */
667 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
668 {
669    /* send full spool file name */
670    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
671    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
672    bash_spaces(name);
673    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
674    free_pool_memory(name);
675    
676    if (jcr->dir_bsock->recv() <= 0) {
677       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
678       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
679       return false;
680    }
681    
682    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
683       return false;
684    }
685    return true;
686 }
687
688 bool commit_attribute_spool(JCR *jcr)
689 {
690    boffset_t size, data_end;
691    char ec1[30];
692    char tbuf[100];
693    BSOCK *dir;
694
695    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
696          (utime_t)time(NULL)));
697    if (are_attributes_spooled(jcr)) {
698       dir = jcr->dir_bsock;
699       if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
700          berrno be;
701          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
702               be.bstrerror());
703          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
704          goto bail_out;
705       }
706       size = ftello(dir->m_spool_fd);
707       if (jcr->is_JobStatus(JS_Incomplete)) {
708          data_end = dir->get_data_end();
709          /* Check and truncate to last valid data_end if necssary */
710          if (size > data_end) {
711             if (ftruncate(fileno(dir->m_spool_fd), data_end) != 0) {
712                berrno be;
713                Jmsg(jcr, M_FATAL, 0, _("Truncate on attributes file failed: ERR=%s\n"),
714                     be.bstrerror());
715                jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
716                goto bail_out;
717             }
718             Dmsg2(100, "=== Attrib spool truncated from %lld to %lld\n", 
719                   size, data_end);
720             size = data_end;
721          }
722       }
723       if (size < 0) {
724          berrno be;
725          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
726               be.bstrerror());
727          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
728          goto bail_out;
729       }
730       P(mutex);
731       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
732          spool_stats.max_attr_size = spool_stats.attr_size + size;
733       }
734       spool_stats.attr_size += size;
735       V(mutex);
736       jcr->sendJobStatus(JS_AttrDespooling);
737       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
738             edit_uint64_with_commas(size, ec1));
739
740       if (!blast_attr_spool_file(jcr, size)) {
741          /* Can't read spool file from director side,
742           * send content over network.
743           */
744          dir->despool(update_attr_spool_size, size);
745       }
746       return close_attr_spool_file(jcr, dir);
747    }
748    return true;
749
750 bail_out:
751    close_attr_spool_file(jcr, dir);
752    return false;
753 }
754
755 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
756 {
757    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
758
759    make_unique_spool_filename(jcr, &name, bs->m_fd);
760    bs->m_spool_fd = fopen(name, "w+b");
761    if (!bs->m_spool_fd) {
762       berrno be;
763       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
764            be.bstrerror());
765       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
766       free_pool_memory(name);
767       return false;
768    }
769    P(mutex);
770    spool_stats.attr_jobs++;
771    V(mutex);
772    free_pool_memory(name);
773    return true;
774 }
775
776 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
777 {
778    POOLMEM *name;
779
780    char tbuf[100];
781
782    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
783          (utime_t)time(NULL)));
784    if (!bs->m_spool_fd) {
785       return true;
786    }
787    name = get_pool_memory(PM_MESSAGE);
788    P(mutex);
789    spool_stats.attr_jobs--;
790    spool_stats.total_attr_jobs++;
791    V(mutex);
792    make_unique_spool_filename(jcr, &name, bs->m_fd);
793    fclose(bs->m_spool_fd);
794    unlink(name);
795    free_pool_memory(name);
796    bs->m_spool_fd = NULL;
797    bs->clear_spooling();
798    return true;
799 }