]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
Make restart2-test work
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2004-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *  Spooling code
30  *
31  *      Kern Sibbald, March 2004
32  *
33  */
34
35 #include "bacula.h"
36 #include "stored.h"
37
38 /* Forward referenced subroutines */
39 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
40 static bool open_data_spool_file(DCR *dcr);
41 static bool close_data_spool_file(DCR *dcr);
42 static bool despool_data(DCR *dcr, bool commit);
43 static int  read_block_from_spool_file(DCR *dcr);
44 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
45 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
46 static bool write_spool_header(DCR *dcr);
47 static bool write_spool_data(DCR *dcr);
48
49 struct spool_stats_t {
50    uint32_t data_jobs;                /* current jobs spooling data */
51    uint32_t attr_jobs;
52    uint32_t total_data_jobs;          /* total jobs to have spooled data */
53    uint32_t total_attr_jobs;
54    int64_t max_data_size;             /* max data size */
55    int64_t max_attr_size;
56    int64_t data_size;                 /* current data size (all jobs running) */
57    int64_t attr_size;
58 };
59
60 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61 spool_stats_t spool_stats;
62
63 /*
64  * Header for data spool record */
65 struct spool_hdr {
66    int32_t  FirstIndex;               /* FirstIndex for buffer */
67    int32_t  LastIndex;                /* LastIndex for buffer */
68    uint32_t len;                      /* length of next buffer */
69 };
70
71 enum {
72    RB_EOT = 1,
73    RB_ERROR,
74    RB_OK
75 };
76
77 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
78 {
79    char ed1[30], ed2[30];
80    POOL_MEM msg(PM_MESSAGE);
81    int len;
82
83    len = Mmsg(msg, _("Spooling statistics:\n"));
84
85    if (spool_stats.data_jobs || spool_stats.max_data_size) {
86       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
87          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
88          spool_stats.total_data_jobs,
89          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
90
91       sendit(msg.c_str(), len, arg);
92    }
93    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
94       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
95          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
96          spool_stats.total_attr_jobs,
97          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
98    
99       sendit(msg.c_str(), len, arg);
100    }
101 }
102
103 bool begin_data_spool(DCR *dcr)
104 {
105    bool stat = true;
106    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
107       Dmsg0(100, "Turning on data spooling\n");
108       dcr->spool_data = true;
109       stat = open_data_spool_file(dcr);
110       if (stat) {
111          dcr->spooling = true;
112          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
113          P(mutex);
114          spool_stats.data_jobs++;
115          V(mutex);
116       }
117    }
118    return stat;
119 }
120
121 bool discard_data_spool(DCR *dcr)
122 {
123    if (dcr->spooling) {
124       Dmsg0(100, "Data spooling discarded\n");
125       return close_data_spool_file(dcr);
126    }
127    return true;
128 }
129
130 bool commit_data_spool(DCR *dcr)
131 {
132    bool stat;
133
134    if (dcr->spooling) {
135       Dmsg0(100, "Committing spooled data\n");
136       stat = despool_data(dcr, true /*commit*/);
137       if (!stat) {
138          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
139          close_data_spool_file(dcr);
140          return false;
141       }
142       return close_data_spool_file(dcr);
143    }
144    return true;
145 }
146
147 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
148 {
149    const char *dir;
150    if (dcr->dev->device->spool_directory) {
151       dir = dcr->dev->device->spool_directory;
152    } else {
153       dir = working_directory;
154    }
155    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
156         dcr->jcr->Job, dcr->device->hdr.name);
157 }
158
159
160 static bool open_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163    int spool_fd;
164
165    make_unique_data_spool_filename(dcr, &name);
166    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
167       dcr->spool_fd = spool_fd;
168       dcr->jcr->spool_attributes = true;
169    } else {
170       berrno be;
171       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
172            be.bstrerror());
173       free_pool_memory(name);
174       return false;
175    }
176    Dmsg1(100, "Created spool file: %s\n", name);
177    free_pool_memory(name);
178    return true;
179 }
180
181 static bool close_data_spool_file(DCR *dcr)
182 {
183    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
184
185    P(mutex);
186    spool_stats.data_jobs--;
187    spool_stats.total_data_jobs++;
188    if (spool_stats.data_size < dcr->job_spool_size) {
189       spool_stats.data_size = 0;
190    } else {
191       spool_stats.data_size -= dcr->job_spool_size;
192    }
193    dcr->job_spool_size = 0;
194    V(mutex);
195
196    make_unique_data_spool_filename(dcr, &name);
197    close(dcr->spool_fd);
198    dcr->spool_fd = -1;
199    dcr->spooling = false;
200    unlink(name);
201    Dmsg1(100, "Deleted spool file: %s\n", name);
202    free_pool_memory(name);
203    return true;
204 }
205
206 static const char *spool_name = "*spool*";
207
208 /*
209  * NB! This routine locks the device, but if committing will
210  *     not unlock it. If not committing, it will be unlocked.
211  */
212 static bool despool_data(DCR *dcr, bool commit)
213 {
214    DEVICE *rdev;
215    DCR *rdcr;
216    bool ok = true;
217    DEV_BLOCK *block;
218    JCR *jcr = dcr->jcr;
219    int stat;
220    char ec1[50];
221    BSOCK *dir = jcr->dir_bsock;
222
223    Dmsg0(100, "Despooling data\n");
224    if (jcr->dcr->job_spool_size == 0) {
225       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
226    }
227
228    /*
229     * Commit means that the job is done, so we commit, otherwise, we
230     *  are despooling because of user spool size max or some error  
231     *  (e.g. filesystem full).
232     */
233    if (commit) {
234       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
235          jcr->dcr->VolumeName,
236          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
237       set_jcr_job_status(jcr, JS_DataCommitting);
238    } else {
239       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
240          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
241       set_jcr_job_status(jcr, JS_DataDespooling);
242    }
243    set_jcr_job_status(jcr, JS_DataDespooling);
244    dir_send_job_status(jcr);
245    dcr->despool_wait = true;
246    dcr->spooling = false;
247    /*
248     * We work with device blocked, but not locked so that
249     *  other threads -- e.g. reservations can lock the device
250     *  structure.
251     */
252    dcr->dblock(BST_DESPOOLING);
253    dcr->despool_wait = false;
254    dcr->despooling = true;
255
256    /*
257     * This is really quite kludgy and should be fixed some time.
258     * We create a dev structure to read from the spool file
259     * in rdev and rdcr.
260     */
261    rdev = (DEVICE *)malloc(sizeof(DEVICE));
262    memset(rdev, 0, sizeof(DEVICE));
263    rdev->dev_name = get_memory(strlen(spool_name)+1);
264    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
265    rdev->errmsg = get_pool_memory(PM_EMSG);
266    *rdev->errmsg = 0;
267    rdev->max_block_size = dcr->dev->max_block_size;
268    rdev->min_block_size = dcr->dev->min_block_size;
269    rdev->device = dcr->dev->device;
270    rdcr = new_dcr(jcr, NULL, rdev);
271    rdcr->spool_fd = dcr->spool_fd;
272    block = dcr->block;                /* save block */
273    dcr->block = rdcr->block;          /* make read and write block the same */
274
275    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
276    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
277
278 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
279    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
280 #endif
281
282    /* Add run time, to get current wait time */
283    int32_t despool_start = time(NULL) - jcr->run_time;
284
285    set_new_file_parameters(dcr);
286
287    for ( ; ok; ) {
288       if (job_canceled(jcr)) {
289          ok = false;
290          break;
291       }
292       stat = read_block_from_spool_file(rdcr);
293       if (stat == RB_EOT) {
294          break;
295       } else if (stat == RB_ERROR) {
296          ok = false;
297          break;
298       }
299       ok = write_block_to_device(dcr);
300       if (!ok) {
301          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
302                dcr->dev->print_name(), dcr->dev->bstrerror());
303          Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
304                dcr->dev->print_name(), dcr->dev->bstrerror());
305       }
306       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
307    }
308
309    /*
310     * If this Job is incomplete, we need to backup the FileIndex
311     *  to the last correctly saved file so that the JobMedia
312     *  LastIndex is correct.
313     */
314    if (jcr->is_JobStatus(JS_Incomplete)) {
315       dcr->VolLastIndex = dir->get_FileIndex();
316       Dmsg1(100, "======= Set FI=%ld\n", dir->get_FileIndex());
317    }
318
319    if (!dir_create_jobmedia_record(dcr)) {
320       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
321          dcr->getVolCatName(), jcr->Job);
322    }
323    /* Set new file/block parameters for current dcr */
324    set_new_file_parameters(dcr);
325
326    /*
327     * Subtracting run_time give us elapsed time - wait_time since 
328     * we started despooling. Note, don't use time_t as it is 32 or 64
329     * bits depending on the OS and doesn't edit with %d
330     */
331    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
332
333    if (despool_elapsed <= 0) {
334       despool_elapsed = 1;
335    }
336
337    Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
338          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
339          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
340
341    dcr->block = block;                /* reset block */
342
343    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
344    if (ftruncate(rdcr->spool_fd, 0) != 0) {
345       berrno be;
346       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
347          be.bstrerror());
348       /* Note, try continuing despite ftruncate problem */
349    }
350
351    P(mutex);
352    if (spool_stats.data_size < dcr->job_spool_size) {
353       spool_stats.data_size = 0;
354    } else {
355       spool_stats.data_size -= dcr->job_spool_size;
356    }
357    V(mutex);
358    P(dcr->dev->spool_mutex);
359    dcr->dev->spool_size -= dcr->job_spool_size;
360    dcr->job_spool_size = 0;            /* zap size in input dcr */
361    V(dcr->dev->spool_mutex);
362    free_memory(rdev->dev_name);
363    free_pool_memory(rdev->errmsg);
364    /* Be careful to NULL the jcr and free rdev after free_dcr() */
365    rdcr->jcr = NULL;
366    rdcr->dev = NULL;
367    free_dcr(rdcr);
368    free(rdev);
369    dcr->spooling = true;           /* turn on spooling again */
370    dcr->despooling = false;
371    /*
372     * Note, if committing we leave the device blocked. It will be removed in
373     *  release_device();
374     */
375    if (!commit) {
376       dcr->dev->dunblock();
377    }
378    set_jcr_job_status(jcr, JS_Running);
379    dir_send_job_status(jcr);
380    return ok;
381 }
382
383 /*
384  * Read a block from the spool file
385  *
386  *  Returns RB_OK on success
387  *          RB_EOT when file done
388  *          RB_ERROR on error
389  */
390 static int read_block_from_spool_file(DCR *dcr)
391 {
392    uint32_t rlen;
393    ssize_t stat;
394    spool_hdr hdr;
395    DEV_BLOCK *block = dcr->block;
396
397    rlen = sizeof(hdr);
398    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
399    if (stat == 0) {
400       Dmsg0(100, "EOT on spool read.\n");
401       return RB_EOT;
402    } else if (stat != (ssize_t)rlen) {
403       if (stat == -1) {
404          berrno be;
405          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
406               be.bstrerror());
407       } else {
408          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
409          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
410       }
411       return RB_ERROR;
412    }
413    rlen = hdr.len;
414    if (rlen > block->buf_len) {
415       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
416       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
417       return RB_ERROR;
418    }
419    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
420    if (stat != (ssize_t)rlen) {
421       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
422       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
423       return RB_ERROR;
424    }
425    /* Setup write pointers */
426    block->binbuf = rlen;
427    block->bufp = block->buf + block->binbuf;
428    block->FirstIndex = hdr.FirstIndex;
429    block->LastIndex = hdr.LastIndex;
430    block->VolSessionId = dcr->jcr->VolSessionId;
431    block->VolSessionTime = dcr->jcr->VolSessionTime;
432    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
433    return RB_OK;
434 }
435
436 /*
437  * Write a block to the spool file
438  *
439  *  Returns: true on success or EOT
440  *           false on hard error
441  */
442 bool write_block_to_spool_file(DCR *dcr)
443 {
444    uint32_t wlen, hlen;               /* length to write */
445    bool despool = false;
446    DEV_BLOCK *block = dcr->block;
447
448    if (job_canceled(dcr->jcr)) {
449       return false;
450    }
451    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
452    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
453       return true;
454    }
455
456    hlen = sizeof(spool_hdr);
457    wlen = block->binbuf;
458    P(dcr->dev->spool_mutex);
459    dcr->job_spool_size += hlen + wlen;
460    dcr->dev->spool_size += hlen + wlen;
461    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
462        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
463       despool = true;
464    }
465    V(dcr->dev->spool_mutex);
466    P(mutex);
467    spool_stats.data_size += hlen + wlen;
468    if (spool_stats.data_size > spool_stats.max_data_size) {
469       spool_stats.max_data_size = spool_stats.data_size;
470    }
471    V(mutex);
472    if (despool) {
473 #ifdef xDEBUG
474       char ec1[30], ec2[30], ec3[30], ec4[30];
475       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
476             "max_job_size=%s job_size=%s\n",
477             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
478             edit_uint64_with_commas(dcr->job_spool_size, ec2),
479             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
480             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
481 #endif
482       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
483       if (!despool_data(dcr, false)) {
484          Pmsg0(000, _("Bad return from despool in write_block.\n"));
485          return false;
486       }
487       /* Despooling cleared these variables so reset them */
488       P(dcr->dev->spool_mutex);
489       dcr->job_spool_size += hlen + wlen;
490       dcr->dev->spool_size += hlen + wlen;
491       V(dcr->dev->spool_mutex);
492       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
493    }
494
495
496    if (!write_spool_header(dcr)) {
497       return false;
498    }
499    if (!write_spool_data(dcr)) {
500      return false;
501    }
502
503    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
504    empty_block(block);
505    return true;
506 }
507
508 static bool write_spool_header(DCR *dcr)
509 {
510    spool_hdr hdr;
511    ssize_t stat;
512    DEV_BLOCK *block = dcr->block;
513
514    hdr.FirstIndex = block->FirstIndex;
515    hdr.LastIndex = block->LastIndex;
516    hdr.len = block->binbuf;
517
518    /* Write header */
519    for (int retry=0; retry<=1; retry++) {
520       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
521       if (stat == -1) {
522          berrno be;
523          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
524               be.bstrerror());
525       }
526       if (stat != (ssize_t)sizeof(hdr)) {
527          Jmsg(dcr->jcr, M_ERROR, 0, _("Error writing header to spool file."
528               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
529               (int)stat, (int)sizeof(hdr));
530          /* If we wrote something, truncate it, then despool */
531          if (stat != -1) {
532 #if defined(HAVE_WIN32)
533             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
534 #else
535             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
536 #endif
537             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
538                berrno be;
539                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
540                   be.bstrerror());
541               /* Note, try continuing despite ftruncate problem */
542             }
543          }
544          if (!despool_data(dcr, false)) {
545             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
546             return false;
547          }
548          continue;                    /* try again */
549       }
550       return true;
551    }
552    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
553    return false;
554 }
555
556 static bool write_spool_data(DCR *dcr)
557 {
558    ssize_t stat;
559    DEV_BLOCK *block = dcr->block;
560
561    /* Write data */
562    for (int retry=0; retry<=1; retry++) {
563       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
564       if (stat == -1) {
565          berrno be;
566          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
567               be.bstrerror());
568       }
569       if (stat != (ssize_t)block->binbuf) {
570          /*
571           * If we wrote something, truncate it and the header, then despool
572           */
573          if (stat != -1) {
574 #if defined(HAVE_WIN32)
575             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
576 #else
577             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
578 #endif
579             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
580                berrno be;
581                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
582                   be.bstrerror());
583                /* Note, try continuing despite ftruncate problem */
584             }
585          }
586          if (!despool_data(dcr, false)) {
587             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
588             return false;
589          }
590          if (!write_spool_header(dcr)) {
591             return false;
592          }
593          continue;                    /* try again */
594       }
595       return true;
596    }
597    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
598    return false;
599 }
600
601
602
603 bool are_attributes_spooled(JCR *jcr)
604 {
605    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
606 }
607
608 /*
609  * Create spool file for attributes.
610  *  This is done by "attaching" to the bsock, and when
611  *  it is called, the output is written to a file.
612  *  The actual spooling is turned on and off in
613  *  append.c only during writing of the attributes.
614  */
615 bool begin_attribute_spool(JCR *jcr)
616 {
617    if (!jcr->no_attributes && jcr->spool_attributes) {
618       return open_attr_spool_file(jcr, jcr->dir_bsock);
619    }
620    return true;
621 }
622
623 bool discard_attribute_spool(JCR *jcr)
624 {
625    if (are_attributes_spooled(jcr)) {
626       return close_attr_spool_file(jcr, jcr->dir_bsock);
627    }
628    return true;
629 }
630
631 static void update_attr_spool_size(ssize_t size)
632 {
633    P(mutex);
634    if (size > 0) {
635      if ((spool_stats.attr_size - size) > 0) {
636         spool_stats.attr_size -= size;
637      } else {
638         spool_stats.attr_size = 0;
639      }
640    }
641    V(mutex);
642 }
643
644 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
645 {
646    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
647       jcr->Job, fd);
648 }
649
650 /*
651  * Tell Director where to find the attributes spool file 
652  *  Note, if we are not on the same machine, the Director will
653  *  return an error, and the higher level routine will transmit
654  *  the data record by record -- using bsock->despool().
655  */
656 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
657 {
658    /* send full spool file name */
659    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
660    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
661    bash_spaces(name);
662    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
663    free_pool_memory(name);
664    
665    if (jcr->dir_bsock->recv() <= 0) {
666       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
667       return false;
668    }
669    
670    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
671       return false;
672    }
673    return true;
674 }
675
676 bool commit_attribute_spool(JCR *jcr)
677 {
678    boffset_t size, data_end;
679    char ec1[30];
680    char tbuf[100];
681    BSOCK *dir;
682
683    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
684          (utime_t)time(NULL)));
685    if (are_attributes_spooled(jcr)) {
686       dir = jcr->dir_bsock;
687       if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
688          berrno be;
689          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
690               be.bstrerror());
691          goto bail_out;
692       }
693       size = ftello(dir->m_spool_fd);
694       if (jcr->is_JobStatus(JS_Incomplete)) {
695          data_end = dir->get_data_end();
696          /* Check and truncate to last valid data_end if necssary */
697          if (size > data_end) {
698             if (ftruncate(fileno(dir->m_spool_fd), data_end) != 0) {
699                berrno be;
700                Jmsg(jcr, M_FATAL, 0, _("Truncate on attributes file failed: ERR=%s\n"),
701                     be.bstrerror());
702                goto bail_out;
703             }
704             Dmsg2(100, "=== Attrib spool truncated from %lld to %lld\n", 
705                   size, data_end);
706             size = data_end;
707          }
708       }
709       if (size < 0) {
710          berrno be;
711          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
712               be.bstrerror());
713          goto bail_out;
714       }
715       P(mutex);
716       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
717          spool_stats.max_attr_size = spool_stats.attr_size + size;
718       }
719       spool_stats.attr_size += size;
720       V(mutex);
721       set_jcr_job_status(jcr, JS_AttrDespooling);
722       dir_send_job_status(jcr);
723       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
724             edit_uint64_with_commas(size, ec1));
725
726       if (!blast_attr_spool_file(jcr, size)) {
727          /* Can't read spool file from director side,
728           * send content over network.
729           */
730          dir->despool(update_attr_spool_size, size);
731       }
732       return close_attr_spool_file(jcr, dir);
733    }
734    return true;
735
736 bail_out:
737    close_attr_spool_file(jcr, dir);
738    return false;
739 }
740
741 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
742 {
743    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
744
745    make_unique_spool_filename(jcr, &name, bs->m_fd);
746    bs->m_spool_fd = fopen(name, "w+b");
747    if (!bs->m_spool_fd) {
748       berrno be;
749       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
750            be.bstrerror());
751       free_pool_memory(name);
752       return false;
753    }
754    P(mutex);
755    spool_stats.attr_jobs++;
756    V(mutex);
757    free_pool_memory(name);
758    return true;
759 }
760
761 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
762 {
763    POOLMEM *name;
764
765    char tbuf[100];
766
767    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
768          (utime_t)time(NULL)));
769    if (!bs->m_spool_fd) {
770       return true;
771    }
772    name = get_pool_memory(PM_MESSAGE);
773    P(mutex);
774    spool_stats.attr_jobs--;
775    spool_stats.total_attr_jobs++;
776    V(mutex);
777    make_unique_spool_filename(jcr, &name, bs->m_fd);
778    fclose(bs->m_spool_fd);
779    unlink(name);
780    free_pool_memory(name);
781    bs->m_spool_fd = NULL;
782    bs->clear_spooling();
783    return true;
784 }