]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
3e26fa76bfeaeddc14260d27a96e57a5e87b842d
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5    Copyright (C) 2004-2014 Free Software Foundation Europe e.V.
6
7    The original author of Bacula is Kern Sibbald, with contributions
8    from many others, a complete list can be found in the file AUTHORS.
9
10    You may use this file and others of this release according to the
11    license defined in the LICENSE file, which includes the Affero General
12    Public License, v3.0 ("AGPLv3") and some additional permissions and
13    terms pursuant to its AGPLv3 Section 7.
14
15    This notice must be preserved when any source code is 
16    conveyed and/or propagated.
17
18    Bacula(R) is a registered trademark of Kern Sibbald.
19 */
20 /*
21  *  Spooling code
22  *
23  *      Kern Sibbald, March 2004
24  *
25  */
26
27 #include "bacula.h"
28 #include "stored.h"
29
30 /* Forward referenced subroutines */
31 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
32 static bool open_data_spool_file(DCR *dcr);
33 static bool close_data_spool_file(DCR *dcr);
34 static bool despool_data(DCR *dcr, bool commit);
35 static int  read_block_from_spool_file(DCR *dcr);
36 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
37 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
38 static bool write_spool_header(DCR *dcr);
39 static bool write_spool_data(DCR *dcr);
40
41 struct spool_stats_t {
42    uint32_t data_jobs;                /* current jobs spooling data */
43    uint32_t attr_jobs;
44    uint32_t total_data_jobs;          /* total jobs to have spooled data */
45    uint32_t total_attr_jobs;
46    int64_t max_data_size;             /* max data size */
47    int64_t max_attr_size;
48    int64_t data_size;                 /* current data size (all jobs running) */
49    int64_t attr_size;
50 };
51
52 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
53 spool_stats_t spool_stats;
54
55 /*
56  * Header for data spool record */
57 struct spool_hdr {
58    int32_t  FirstIndex;               /* FirstIndex for buffer */
59    int32_t  LastIndex;                /* LastIndex for buffer */
60    uint32_t len;                      /* length of next buffer */
61 };
62
63 enum {
64    RB_EOT = 1,
65    RB_ERROR,
66    RB_OK
67 };
68
69 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
70 {
71    char ed1[30], ed2[30];
72    POOL_MEM msg(PM_MESSAGE);
73    int len;
74
75    len = Mmsg(msg, _("Spooling statistics:\n"));
76
77    if (spool_stats.data_jobs || spool_stats.max_data_size) {
78       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
79          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
80          spool_stats.total_data_jobs,
81          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
82
83       sendit(msg.c_str(), len, arg);
84    }
85    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
86       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
87          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
88          spool_stats.total_attr_jobs,
89          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
90
91       sendit(msg.c_str(), len, arg);
92    }
93 }
94
95 bool begin_data_spool(DCR *dcr)
96 {
97    bool stat = true;
98    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
99       Dmsg0(100, "Turning on data spooling\n");
100       dcr->spool_data = true;
101       stat = open_data_spool_file(dcr);
102       if (stat) {
103          dcr->spooling = true;
104          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
105          P(mutex);
106          spool_stats.data_jobs++;
107          V(mutex);
108       }
109    }
110    return stat;
111 }
112
113 bool discard_data_spool(DCR *dcr)
114 {
115    if (dcr->spooling) {
116       Dmsg0(100, "Data spooling discarded\n");
117       return close_data_spool_file(dcr);
118    }
119    return true;
120 }
121
122 bool commit_data_spool(DCR *dcr)
123 {
124    bool stat;
125
126    if (dcr->spooling) {
127       Dmsg0(100, "Committing spooled data\n");
128       stat = despool_data(dcr, true /*commit*/);
129       if (!stat) {
130          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
131          close_data_spool_file(dcr);
132          return false;
133       }
134       return close_data_spool_file(dcr);
135    }
136    return true;
137 }
138
139 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
140 {
141    const char *dir;
142    if (dcr->dev->device->spool_directory) {
143       dir = dcr->dev->device->spool_directory;
144    } else {
145       dir = working_directory;
146    }
147    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
148         dcr->jcr->Job, dcr->device->hdr.name);
149 }
150
151
152 static bool open_data_spool_file(DCR *dcr)
153 {
154    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
155    int spool_fd;
156
157    make_unique_data_spool_filename(dcr, &name);
158    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
159       dcr->spool_fd = spool_fd;
160       dcr->jcr->spool_attributes = true;
161    } else {
162       berrno be;
163       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
164            be.bstrerror());
165       free_pool_memory(name);
166       return false;
167    }
168    Dmsg1(100, "Created spool file: %s\n", name);
169    free_pool_memory(name);
170    return true;
171 }
172
173 static const char *spool_name = "*spool*";
174
175 /*
176  * NB! This routine locks the device, but if committing will
177  *     not unlock it. If not committing, it will be unlocked.
178  */
179 static bool despool_data(DCR *dcr, bool commit)
180 {
181    DEVICE *rdev;
182    DCR *rdcr;
183    bool ok = true;
184    DEV_BLOCK *block;
185    JCR *jcr = dcr->jcr;
186    int stat;
187    char ec1[50];
188
189    Dmsg0(100, "Despooling data\n");
190    if (jcr->dcr->job_spool_size == 0) {
191       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
192    }
193
194    /*
195     * Commit means that the job is done, so we commit, otherwise, we
196     *  are despooling because of user spool size max or some error
197     *  (e.g. filesystem full).
198     */
199    if (commit) {
200       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
201          jcr->dcr->VolumeName,
202          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
203       jcr->setJobStatus(JS_DataCommitting);
204    } else {
205       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
206          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
207       jcr->setJobStatus(JS_DataDespooling);
208    }
209    jcr->sendJobStatus(JS_DataDespooling);
210    dcr->despool_wait = true;
211    dcr->spooling = false;
212    /*
213     * We work with device blocked, but not locked so that
214     *  other threads -- e.g. reservations can lock the device
215     *  structure.
216     */
217    dcr->dblock(BST_DESPOOLING);
218    dcr->despool_wait = false;
219    dcr->despooling = true;
220
221    /*
222     * This is really quite kludgy and should be fixed some time.
223     * We create a dev structure to read from the spool file
224     * in rdev and rdcr.
225     */
226    rdev = (DEVICE *)malloc(sizeof(DEVICE));
227    memset(rdev, 0, sizeof(DEVICE));
228    rdev->dev_name = get_memory(strlen(spool_name)+1);
229    bstrncpy(rdev->dev_name, spool_name, strlen(spool_name)+1);
230    rdev->errmsg = get_pool_memory(PM_EMSG);
231    *rdev->errmsg = 0;
232    rdev->max_block_size = dcr->dev->max_block_size;
233    rdev->min_block_size = dcr->dev->min_block_size;
234    rdev->device = dcr->dev->device;
235    rdcr = new_dcr(jcr, NULL, rdev, SD_READ);
236    rdcr->spool_fd = dcr->spool_fd;
237    block = dcr->block;                /* save block */
238    dcr->block = rdcr->block;          /* make read and write block the same */
239
240    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
241    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
242
243 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
244    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
245 #endif
246
247    /* Add run time, to get current wait time */
248    int32_t despool_start = time(NULL) - jcr->run_time;
249
250    set_new_file_parameters(dcr);
251
252    for ( ; ok; ) {
253       if (job_canceled(jcr)) {
254          ok = false;
255          break;
256       }
257       stat = read_block_from_spool_file(rdcr);
258       if (stat == RB_EOT) {
259          break;
260       } else if (stat == RB_ERROR) {
261          ok = false;
262          break;
263       }
264       ok = dcr->write_block_to_device();
265       if (!ok) {
266          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
267                dcr->dev->print_name(), dcr->dev->bstrerror());
268          Pmsg2(000, "Fatal append error on device %s: ERR=%s\n",
269                dcr->dev->print_name(), dcr->dev->bstrerror());
270          /* Force in case Incomplete set */
271          jcr->forceJobStatus(JS_FatalError);
272       }
273       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
274    }
275
276    if (!dir_create_jobmedia_record(dcr)) {
277       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
278          dcr->getVolCatName(), jcr->Job);
279       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
280    }
281    flush_jobmedia_queue(jcr);
282    /* Set new file/block parameters for current dcr */
283    set_new_file_parameters(dcr);
284
285    /*
286     * Subtracting run_time give us elapsed time - wait_time since
287     * we started despooling. Note, don't use time_t as it is 32 or 64
288     * bits depending on the OS and doesn't edit with %d
289     */
290    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
291
292    if (despool_elapsed <= 0) {
293       despool_elapsed = 1;
294    }
295
296    Jmsg(jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
297          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
298          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
299
300    dcr->block = block;                /* reset block */
301
302    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
303    if (ftruncate(rdcr->spool_fd, 0) != 0) {
304       berrno be;
305       Jmsg(jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
306          be.bstrerror());
307       /* Note, try continuing despite ftruncate problem */
308    }
309
310    P(mutex);
311    if (spool_stats.data_size < dcr->job_spool_size) {
312       spool_stats.data_size = 0;
313    } else {
314       spool_stats.data_size -= dcr->job_spool_size;
315    }
316    V(mutex);
317    P(dcr->dev->spool_mutex);
318    dcr->dev->spool_size -= dcr->job_spool_size;
319    dcr->job_spool_size = 0;            /* zap size in input dcr */
320    V(dcr->dev->spool_mutex);
321    free_memory(rdev->dev_name);
322    free_pool_memory(rdev->errmsg);
323    /* Be careful to NULL the jcr and free rdev after free_dcr() */
324    rdcr->jcr = NULL;
325    rdcr->set_dev(NULL);
326    free_dcr(rdcr);
327    free(rdev);
328    dcr->spooling = true;           /* turn on spooling again */
329    dcr->despooling = false;
330    /*
331     * Note, if committing we leave the device blocked. It will be removed in
332     *  release_device();
333     */
334    if (!commit) {
335       dcr->dev->dunblock();
336    }
337    jcr->sendJobStatus(JS_Running);
338    return ok;
339 }
340
341 /*
342  * Read a block from the spool file
343  *
344  *  Returns RB_OK on success
345  *          RB_EOT when file done
346  *          RB_ERROR on error
347  */
348 static int read_block_from_spool_file(DCR *dcr)
349 {
350    uint32_t rlen;
351    ssize_t stat;
352    spool_hdr hdr;
353    DEV_BLOCK *block = dcr->block;
354    JCR *jcr = dcr->jcr;
355
356    rlen = sizeof(hdr);
357    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
358    if (stat == 0) {
359       Dmsg0(100, "EOT on spool read.\n");
360       return RB_EOT;
361    } else if (stat != (ssize_t)rlen) {
362       if (stat == -1) {
363          berrno be;
364          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
365               be.bstrerror());
366       } else {
367          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
368          Jmsg2(jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
369       }
370       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
371       return RB_ERROR;
372    }
373    rlen = hdr.len;
374    if (rlen > block->buf_len) {
375       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
376       Jmsg2(jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
377       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
378       return RB_ERROR;
379    }
380    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
381    if (stat != (ssize_t)rlen) {
382       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
383       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
384       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
385       return RB_ERROR;
386    }
387    /* Setup write pointers */
388    block->binbuf = rlen;
389    block->bufp = block->buf + block->binbuf;
390    block->FirstIndex = hdr.FirstIndex;
391    block->LastIndex = hdr.LastIndex;
392    block->VolSessionId = dcr->jcr->VolSessionId;
393    block->VolSessionTime = dcr->jcr->VolSessionTime;
394    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
395    return RB_OK;
396 }
397
398 /*
399  * Write a block to the spool file
400  *
401  *  Returns: true on success or EOT
402  *           false on hard error
403  */
404 bool write_block_to_spool_file(DCR *dcr)
405 {
406    uint32_t wlen, hlen;               /* length to write */
407    bool despool = false;
408    DEV_BLOCK *block = dcr->block;
409
410    if (job_canceled(dcr->jcr)) {
411       return false;
412    }
413    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
414    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
415       return true;
416    }
417
418    hlen = sizeof(spool_hdr);
419    wlen = block->binbuf;
420    P(dcr->dev->spool_mutex);
421    dcr->job_spool_size += hlen + wlen;
422    dcr->dev->spool_size += hlen + wlen;
423    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
424        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
425       despool = true;
426    }
427    V(dcr->dev->spool_mutex);
428    P(mutex);
429    spool_stats.data_size += hlen + wlen;
430    if (spool_stats.data_size > spool_stats.max_data_size) {
431       spool_stats.max_data_size = spool_stats.data_size;
432    }
433    V(mutex);
434    if (despool) {
435       char ec1[30], ec2[30];
436       if (dcr->max_job_spool_size > 0) {
437          Jmsg(dcr->jcr, M_INFO, 0, _("User specified Job spool size reached: "
438             "JobSpoolSize=%s MaxJobSpoolSize=%s\n"),
439             edit_uint64_with_commas(dcr->job_spool_size, ec1),
440             edit_uint64_with_commas(dcr->max_job_spool_size, ec2));
441       } else {
442          Jmsg(dcr->jcr, M_INFO, 0, _("User specified Device spool size reached: "
443             "DevSpoolSize=%s MaxDevSpoolSize=%s\n"),
444             edit_uint64_with_commas(dcr->dev->spool_size, ec1),
445             edit_uint64_with_commas(dcr->dev->max_spool_size, ec2));
446       }
447
448       if (!despool_data(dcr, false)) {
449          Pmsg0(000, _("Bad return from despool in write_block.\n"));
450          return false;
451       }
452       /* Despooling cleared these variables so reset them */
453       P(dcr->dev->spool_mutex);
454       dcr->job_spool_size += hlen + wlen;
455       dcr->dev->spool_size += hlen + wlen;
456       V(dcr->dev->spool_mutex);
457       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
458    }
459
460
461    if (!write_spool_header(dcr)) {
462       return false;
463    }
464    if (!write_spool_data(dcr)) {
465      return false;
466    }
467
468    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
469    empty_block(block);
470    return true;
471 }
472
473 static bool write_spool_header(DCR *dcr)
474 {
475    spool_hdr hdr;
476    ssize_t stat;
477    DEV_BLOCK *block = dcr->block;
478    JCR *jcr = dcr->jcr;
479
480    hdr.FirstIndex = block->FirstIndex;
481    hdr.LastIndex = block->LastIndex;
482    hdr.len = block->binbuf;
483
484    /* Write header */
485    for (int retry=0; retry<=1; retry++) {
486       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
487       if (stat == -1) {
488          berrno be;
489          Jmsg(jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
490               be.bstrerror());
491          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
492       }
493       if (stat != (ssize_t)sizeof(hdr)) {
494          Jmsg(jcr, M_ERROR, 0, _("Error writing header to spool file."
495               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
496               (int)stat, (int)sizeof(hdr));
497          /* If we wrote something, truncate it, then despool */
498          if (stat != -1) {
499 #if defined(HAVE_WIN32)
500             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
501 #else
502             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
503 #endif
504             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
505                berrno be;
506                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
507                   be.bstrerror());
508               /* Note, try continuing despite ftruncate problem */
509             }
510          }
511          if (!despool_data(dcr, false)) {
512             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
513             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
514             return false;
515          }
516          continue;                    /* try again */
517       }
518       return true;
519    }
520    Jmsg(jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
521    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
522    return false;
523 }
524
525 static bool write_spool_data(DCR *dcr)
526 {
527    ssize_t stat;
528    DEV_BLOCK *block = dcr->block;
529    JCR *jcr = dcr->jcr;
530
531    /* Write data */
532    for (int retry=0; retry<=1; retry++) {
533       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
534       if (stat == -1) {
535          berrno be;
536          Jmsg(jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
537               be.bstrerror());
538          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
539       }
540       if (stat != (ssize_t)block->binbuf) {
541          /*
542           * If we wrote something, truncate it and the header, then despool
543           */
544          if (stat != -1) {
545 #if defined(HAVE_WIN32)
546             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
547 #else
548             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
549 #endif
550             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
551                berrno be;
552                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
553                   be.bstrerror());
554                /* Note, try continuing despite ftruncate problem */
555             }
556          }
557          if (!despool_data(dcr, false)) {
558             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
559             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
560             return false;
561          }
562          if (!write_spool_header(dcr)) {
563             return false;
564          }
565          continue;                    /* try again */
566       }
567       return true;
568    }
569    Jmsg(jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
570    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
571    return false;
572 }
573
574 static bool close_data_spool_file(DCR *dcr)
575 {
576    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
577
578    P(mutex);
579    spool_stats.data_jobs--;
580    spool_stats.total_data_jobs++;
581    if (spool_stats.data_size < dcr->job_spool_size) {
582       spool_stats.data_size = 0;
583    } else {
584       spool_stats.data_size -= dcr->job_spool_size;
585    }
586    V(mutex);
587    P(dcr->dev->spool_mutex);
588    dcr->job_spool_size = 0;
589    V(dcr->dev->spool_mutex);
590
591    make_unique_data_spool_filename(dcr, &name);
592    close(dcr->spool_fd);
593    dcr->spool_fd = -1;
594    dcr->spooling = false;
595    unlink(name);
596    Dmsg1(100, "Deleted spool file: %s\n", name);
597    free_pool_memory(name);
598    return true;
599 }
600
601 bool are_attributes_spooled(JCR *jcr)
602 {
603    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
604 }
605
606 /*
607  * Create spool file for attributes.
608  *  This is done by "attaching" to the bsock, and when
609  *  it is called, the output is written to a file.
610  *  The actual spooling is turned on and off in
611  *  append.c only during writing of the attributes.
612  */
613 bool begin_attribute_spool(JCR *jcr)
614 {
615    if (!jcr->no_attributes && jcr->spool_attributes) {
616       return open_attr_spool_file(jcr, jcr->dir_bsock);
617    }
618    return true;
619 }
620
621 static void update_attr_spool_size(ssize_t size)
622 {
623    P(mutex);
624    if (size > 0) {
625      if ((spool_stats.attr_size - size) > 0) {
626         spool_stats.attr_size -= size;
627      } else {
628         spool_stats.attr_size = 0;
629      }
630    }
631    V(mutex);
632 }
633
634 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
635 {
636    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
637       jcr->Job, fd);
638 }
639
640 /*
641  * Tell Director where to find the attributes spool file
642  *  Note, if we are not on the same machine, the Director will
643  *  return an error, and the higher level routine will transmit
644  *  the data record by record -- using bsock->despool().
645  */
646 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
647 {
648    /* send full spool file name */
649    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
650    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
651    bash_spaces(name);
652    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
653    free_pool_memory(name);
654
655    if (jcr->dir_bsock->recv() <= 0) {
656       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
657       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
658       return false;
659    }
660
661    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
662       return false;
663    }
664    return true;
665 }
666
667 bool commit_attribute_spool(JCR *jcr)
668 {
669    boffset_t size, data_end;
670    char ec1[30];
671    char tbuf[100];
672    BSOCK *dir;
673
674    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
675          (utime_t)time(NULL)));
676    if (are_attributes_spooled(jcr)) {
677       dir = jcr->dir_bsock;
678       if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
679          berrno be;
680          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
681               be.bstrerror());
682          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
683          goto bail_out;
684       }
685       size = ftello(dir->m_spool_fd);
686       /* For Incomplete Job truncate spool file to last valid data_end if necssary */
687       if (jcr->is_JobStatus(JS_Incomplete)) {
688          data_end = dir->get_last_data_end();
689          if (size > data_end) {
690             if (ftruncate(fileno(dir->m_spool_fd), data_end) != 0) {
691                berrno be;
692                Jmsg(jcr, M_FATAL, 0, _("Truncate on attributes file failed: ERR=%s\n"),
693                     be.bstrerror());
694                jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
695                goto bail_out;
696             }
697             Dmsg2(100, "=== Attrib spool truncated from %lld to %lld\n",
698                   size, data_end);
699             size = data_end;
700          }
701       }
702       if (size < 0) {
703          berrno be;
704          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
705               be.bstrerror());
706          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
707          goto bail_out;
708       }
709       P(mutex);
710       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
711          spool_stats.max_attr_size = spool_stats.attr_size + size;
712       }
713       spool_stats.attr_size += size;
714       V(mutex);
715       jcr->sendJobStatus(JS_AttrDespooling);
716       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
717             edit_uint64_with_commas(size, ec1));
718
719       if (!blast_attr_spool_file(jcr, size)) {
720          /* Can't read spool file from director side,
721           * send content over network.
722           */
723          dir->despool(update_attr_spool_size, size);
724       }
725       return close_attr_spool_file(jcr, dir);
726    }
727    return true;
728
729 bail_out:
730    close_attr_spool_file(jcr, dir);
731    return false;
732 }
733
734 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
735 {
736    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
737
738    make_unique_spool_filename(jcr, &name, bs->m_fd);
739    bs->m_spool_fd = fopen(name, "w+b");
740    if (!bs->m_spool_fd) {
741       berrno be;
742       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
743            be.bstrerror());
744       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
745       free_pool_memory(name);
746       return false;
747    }
748    P(mutex);
749    spool_stats.attr_jobs++;
750    V(mutex);
751    free_pool_memory(name);
752    return true;
753 }
754
755 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
756 {
757    POOLMEM *name;
758
759    char tbuf[100];
760
761    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
762          (utime_t)time(NULL)));
763    if (!bs->m_spool_fd) {
764       return true;
765    }
766    name = get_pool_memory(PM_MESSAGE);
767    P(mutex);
768    spool_stats.attr_jobs--;
769    spool_stats.total_attr_jobs++;
770    V(mutex);
771    make_unique_spool_filename(jcr, &name, bs->m_fd);
772    fclose(bs->m_spool_fd);
773    unlink(name);
774    free_pool_memory(name);
775    bs->m_spool_fd = NULL;
776    bs->clear_spooling();
777    return true;
778 }
779
780 bool discard_attribute_spool(JCR *jcr)
781 {
782    if (are_attributes_spooled(jcr)) {
783       return close_attr_spool_file(jcr, jcr->dir_bsock);
784    }
785    return true;
786 }