]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
Change copyright as per agreement with FSFE
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2016 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  *  Spooling code
21  *
22  *      Kern Sibbald, March 2004
23  *
24  */
25
26 #include "bacula.h"
27 #include "stored.h"
28
29 /* Forward referenced subroutines */
30 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
31 static bool open_data_spool_file(DCR *dcr);
32 static bool close_data_spool_file(DCR *dcr);
33 static bool despool_data(DCR *dcr, bool commit);
34 static int  read_block_from_spool_file(DCR *dcr);
35 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
36 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
37 static bool write_spool_header(DCR *dcr);
38 static bool write_spool_data(DCR *dcr);
39
40 struct spool_stats_t {
41    uint32_t data_jobs;                /* current jobs spooling data */
42    uint32_t attr_jobs;
43    uint32_t total_data_jobs;          /* total jobs to have spooled data */
44    uint32_t total_attr_jobs;
45    int64_t max_data_size;             /* max data size */
46    int64_t max_attr_size;
47    int64_t data_size;                 /* current data size (all jobs running) */
48    int64_t attr_size;
49 };
50
51 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
52 spool_stats_t spool_stats;
53
54 /*
55  * Header for data spool record */
56 struct spool_hdr {
57    int32_t  FirstIndex;               /* FirstIndex for buffer */
58    int32_t  LastIndex;                /* LastIndex for buffer */
59    uint32_t len;                      /* length of next buffer */
60 };
61
62 enum {
63    RB_EOT = 1,
64    RB_ERROR,
65    RB_OK
66 };
67
68 void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
69 {
70    char ed1[30], ed2[30];
71    POOL_MEM msg(PM_MESSAGE);
72    int len;
73
74    len = Mmsg(msg, _("Spooling statistics:\n"));
75
76    if (spool_stats.data_jobs || spool_stats.max_data_size) {
77       len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
78          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
79          spool_stats.total_data_jobs,
80          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
81
82       sendit(msg.c_str(), len, arg);
83    }
84    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
85       len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
86          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
87          spool_stats.total_attr_jobs,
88          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
89
90       sendit(msg.c_str(), len, arg);
91    }
92 }
93
94 bool begin_data_spool(DCR *dcr)
95 {
96    bool stat = true;
97    if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
98       Dmsg0(100, "Turning on data spooling\n");
99       dcr->spool_data = true;
100       stat = open_data_spool_file(dcr);
101       if (stat) {
102          dcr->spooling = true;
103          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
104          P(mutex);
105          spool_stats.data_jobs++;
106          V(mutex);
107       }
108    }
109    return stat;
110 }
111
112 bool discard_data_spool(DCR *dcr)
113 {
114    if (dcr->spooling) {
115       Dmsg0(100, "Data spooling discarded\n");
116       return close_data_spool_file(dcr);
117    }
118    return true;
119 }
120
121 bool commit_data_spool(DCR *dcr)
122 {
123    bool stat;
124
125    if (dcr->spooling) {
126       Dmsg0(100, "Committing spooled data\n");
127       stat = despool_data(dcr, true /*commit*/);
128       if (!stat) {
129          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
130          close_data_spool_file(dcr);
131          return false;
132       }
133       return close_data_spool_file(dcr);
134    }
135    return true;
136 }
137
138 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
139 {
140    const char *dir;
141    if (dcr->dev->device->spool_directory) {
142       dir = dcr->dev->device->spool_directory;
143    } else {
144       dir = working_directory;
145    }
146    Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
147         dcr->jcr->Job, dcr->device->hdr.name);
148 }
149
150
151 static bool open_data_spool_file(DCR *dcr)
152 {
153    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
154    int spool_fd;
155
156    make_unique_data_spool_filename(dcr, &name);
157    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
158       dcr->spool_fd = spool_fd;
159       dcr->jcr->spool_attributes = true;
160    } else {
161       berrno be;
162       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
163            be.bstrerror());
164       free_pool_memory(name);
165       return false;
166    }
167    Dmsg1(100, "Created spool file: %s\n", name);
168    free_pool_memory(name);
169    return true;
170 }
171
172 static const char *spool_name = "*spool*";
173
174 /*
175  * NB! This routine locks the device, but if committing will
176  *     not unlock it. If not committing, it will be unlocked.
177  */
178 static bool despool_data(DCR *dcr, bool commit)
179 {
180    DEVICE *rdev;
181    DCR *rdcr;
182    bool ok = true;
183    DEV_BLOCK *block;
184    JCR *jcr = dcr->jcr;
185    int stat;
186    char ec1[50];
187
188    Dmsg0(100, "Despooling data\n");
189    if (jcr->dcr->job_spool_size == 0) {
190       Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
191    }
192
193    /*
194     * Commit means that the job is done, so we commit, otherwise, we
195     *  are despooling because of user spool size max or some error
196     *  (e.g. filesystem full).
197     */
198    if (commit) {
199       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
200          jcr->dcr->VolumeName,
201          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
202       jcr->setJobStatus(JS_DataCommitting);
203    } else {
204       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
205          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
206       jcr->setJobStatus(JS_DataDespooling);
207    }
208    jcr->sendJobStatus(JS_DataDespooling);
209    dcr->despool_wait = true;
210    dcr->spooling = false;
211    /*
212     * We work with device blocked, but not locked so that
213     *  other threads -- e.g. reservations can lock the device
214     *  structure.
215     */
216    dcr->dblock(BST_DESPOOLING);
217    dcr->despool_wait = false;
218    dcr->despooling = true;
219
220    /*
221     * This is really quite kludgy and should be fixed some time.
222     * We create a dev structure to read from the spool file
223     * in rdev and rdcr.
224     */
225    rdev = (DEVICE *)malloc(sizeof(DEVICE));
226    memset(rdev, 0, sizeof(DEVICE));
227    rdev->dev_name = get_memory(strlen(spool_name)+1);
228    bstrncpy(rdev->dev_name, spool_name, strlen(spool_name)+1);
229    rdev->errmsg = get_pool_memory(PM_EMSG);
230    *rdev->errmsg = 0;
231    rdev->max_block_size = dcr->dev->max_block_size;
232    rdev->min_block_size = dcr->dev->min_block_size;
233    rdev->device = dcr->dev->device;
234    rdcr = new_dcr(jcr, NULL, rdev, SD_READ);
235    rdcr->spool_fd = dcr->spool_fd;
236    block = dcr->block;                /* save block */
237    dcr->block = rdcr->block;          /* make read and write block the same */
238
239    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
240    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
241
242 #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
243    posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
244 #endif
245
246    /* Add run time, to get current wait time */
247    int32_t despool_start = time(NULL) - jcr->run_time;
248
249    set_new_file_parameters(dcr);
250
251    for ( ; ok; ) {
252       if (job_canceled(jcr)) {
253          ok = false;
254          break;
255       }
256       stat = read_block_from_spool_file(rdcr);
257       if (stat == RB_EOT) {
258          break;
259       } else if (stat == RB_ERROR) {
260          ok = false;
261          break;
262       }
263       ok = dcr->write_block_to_device();
264       if (!ok) {
265          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
266                dcr->dev->print_name(), dcr->dev->bstrerror());
267          Pmsg2(000, "Fatal append error on device %s: ERR=%s\n",
268                dcr->dev->print_name(), dcr->dev->bstrerror());
269          /* Force in case Incomplete set */
270          jcr->forceJobStatus(JS_FatalError);
271       }
272       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
273    }
274
275    if (!dir_create_jobmedia_record(dcr)) {
276       Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
277          dcr->getVolCatName(), jcr->Job);
278       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
279    }
280    flush_jobmedia_queue(jcr);
281    /* Set new file/block parameters for current dcr */
282    set_new_file_parameters(dcr);
283
284    /*
285     * Subtracting run_time give us elapsed time - wait_time since
286     * we started despooling. Note, don't use time_t as it is 32 or 64
287     * bits depending on the OS and doesn't edit with %d
288     */
289    int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
290
291    if (despool_elapsed <= 0) {
292       despool_elapsed = 1;
293    }
294
295    Jmsg(jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
296          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
297          edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
298
299    dcr->block = block;                /* reset block */
300
301    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
302    if (ftruncate(rdcr->spool_fd, 0) != 0) {
303       berrno be;
304       Jmsg(jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
305          be.bstrerror());
306       /* Note, try continuing despite ftruncate problem */
307    }
308
309    P(mutex);
310    if (spool_stats.data_size < dcr->job_spool_size) {
311       spool_stats.data_size = 0;
312    } else {
313       spool_stats.data_size -= dcr->job_spool_size;
314    }
315    V(mutex);
316    P(dcr->dev->spool_mutex);
317    dcr->dev->spool_size -= dcr->job_spool_size;
318    dcr->job_spool_size = 0;            /* zap size in input dcr */
319    V(dcr->dev->spool_mutex);
320    free_memory(rdev->dev_name);
321    free_pool_memory(rdev->errmsg);
322    /* Be careful to NULL the jcr and free rdev after free_dcr() */
323    rdcr->jcr = NULL;
324    rdcr->set_dev(NULL);
325    free_dcr(rdcr);
326    free(rdev);
327    dcr->spooling = true;           /* turn on spooling again */
328    dcr->despooling = false;
329    /*
330     * Note, if committing we leave the device blocked. It will be removed in
331     *  release_device();
332     */
333    if (!commit) {
334       dcr->dev->dunblock();
335    }
336    jcr->sendJobStatus(JS_Running);
337    return ok;
338 }
339
340 /*
341  * Read a block from the spool file
342  *
343  *  Returns RB_OK on success
344  *          RB_EOT when file done
345  *          RB_ERROR on error
346  */
347 static int read_block_from_spool_file(DCR *dcr)
348 {
349    uint32_t rlen;
350    ssize_t stat;
351    spool_hdr hdr;
352    DEV_BLOCK *block = dcr->block;
353    JCR *jcr = dcr->jcr;
354
355    rlen = sizeof(hdr);
356    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
357    if (stat == 0) {
358       Dmsg0(100, "EOT on spool read.\n");
359       return RB_EOT;
360    } else if (stat != (ssize_t)rlen) {
361       if (stat == -1) {
362          berrno be;
363          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
364               be.bstrerror());
365       } else {
366          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
367          Jmsg2(jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
368       }
369       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
370       return RB_ERROR;
371    }
372    rlen = hdr.len;
373    if (rlen > block->buf_len) {
374       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
375       Jmsg2(jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
376       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
377       return RB_ERROR;
378    }
379    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
380    if (stat != (ssize_t)rlen) {
381       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
382       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
383       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
384       return RB_ERROR;
385    }
386    /* Setup write pointers */
387    block->binbuf = rlen;
388    block->bufp = block->buf + block->binbuf;
389    block->FirstIndex = hdr.FirstIndex;
390    block->LastIndex = hdr.LastIndex;
391    block->VolSessionId = dcr->jcr->VolSessionId;
392    block->VolSessionTime = dcr->jcr->VolSessionTime;
393    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
394    return RB_OK;
395 }
396
397 /*
398  * Write a block to the spool file
399  *
400  *  Returns: true on success or EOT
401  *           false on hard error
402  */
403 bool write_block_to_spool_file(DCR *dcr)
404 {
405    uint32_t wlen, hlen;               /* length to write */
406    bool despool = false;
407    DEV_BLOCK *block = dcr->block;
408
409    if (job_canceled(dcr->jcr)) {
410       return false;
411    }
412    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
413    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
414       return true;
415    }
416
417    hlen = sizeof(spool_hdr);
418    wlen = block->binbuf;
419    P(dcr->dev->spool_mutex);
420    dcr->job_spool_size += hlen + wlen;
421    dcr->dev->spool_size += hlen + wlen;
422    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
423        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
424       despool = true;
425    }
426    V(dcr->dev->spool_mutex);
427    P(mutex);
428    spool_stats.data_size += hlen + wlen;
429    if (spool_stats.data_size > spool_stats.max_data_size) {
430       spool_stats.max_data_size = spool_stats.data_size;
431    }
432    V(mutex);
433    if (despool) {
434       char ec1[30], ec2[30];
435       if (dcr->max_job_spool_size > 0) {
436          Jmsg(dcr->jcr, M_INFO, 0, _("User specified Job spool size reached: "
437             "JobSpoolSize=%s MaxJobSpoolSize=%s\n"),
438             edit_uint64_with_commas(dcr->job_spool_size, ec1),
439             edit_uint64_with_commas(dcr->max_job_spool_size, ec2));
440       } else {
441          Jmsg(dcr->jcr, M_INFO, 0, _("User specified Device spool size reached: "
442             "DevSpoolSize=%s MaxDevSpoolSize=%s\n"),
443             edit_uint64_with_commas(dcr->dev->spool_size, ec1),
444             edit_uint64_with_commas(dcr->dev->max_spool_size, ec2));
445       }
446
447       if (!despool_data(dcr, false)) {
448          Pmsg0(000, _("Bad return from despool in write_block.\n"));
449          return false;
450       }
451       /* Despooling cleared these variables so reset them */
452       P(dcr->dev->spool_mutex);
453       dcr->job_spool_size += hlen + wlen;
454       dcr->dev->spool_size += hlen + wlen;
455       V(dcr->dev->spool_mutex);
456       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
457    }
458
459
460    if (!write_spool_header(dcr)) {
461       return false;
462    }
463    if (!write_spool_data(dcr)) {
464      return false;
465    }
466
467    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
468    empty_block(block);
469    return true;
470 }
471
472 static bool write_spool_header(DCR *dcr)
473 {
474    spool_hdr hdr;
475    ssize_t stat;
476    DEV_BLOCK *block = dcr->block;
477    JCR *jcr = dcr->jcr;
478
479    hdr.FirstIndex = block->FirstIndex;
480    hdr.LastIndex = block->LastIndex;
481    hdr.len = block->binbuf;
482
483    /* Write header */
484    for (int retry=0; retry<=1; retry++) {
485       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
486       if (stat == -1) {
487          berrno be;
488          Jmsg(jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
489               be.bstrerror());
490          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
491       }
492       if (stat != (ssize_t)sizeof(hdr)) {
493          Jmsg(jcr, M_ERROR, 0, _("Error writing header to spool file."
494               " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
495               (int)stat, (int)sizeof(hdr));
496          /* If we wrote something, truncate it, then despool */
497          if (stat != -1) {
498 #if defined(HAVE_WIN32)
499             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
500 #else
501             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
502 #endif
503             if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
504                berrno be;
505                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
506                   be.bstrerror());
507               /* Note, try continuing despite ftruncate problem */
508             }
509          }
510          if (!despool_data(dcr, false)) {
511             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
512             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
513             return false;
514          }
515          continue;                    /* try again */
516       }
517       return true;
518    }
519    Jmsg(jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
520    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
521    return false;
522 }
523
524 static bool write_spool_data(DCR *dcr)
525 {
526    ssize_t stat;
527    DEV_BLOCK *block = dcr->block;
528    JCR *jcr = dcr->jcr;
529
530    /* Write data */
531    for (int retry=0; retry<=1; retry++) {
532       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
533       if (stat == -1) {
534          berrno be;
535          Jmsg(jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
536               be.bstrerror());
537          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
538       }
539       if (stat != (ssize_t)block->binbuf) {
540          /*
541           * If we wrote something, truncate it and the header, then despool
542           */
543          if (stat != -1) {
544 #if defined(HAVE_WIN32)
545             boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
546 #else
547             boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
548 #endif
549             if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
550                berrno be;
551                Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
552                   be.bstrerror());
553                /* Note, try continuing despite ftruncate problem */
554             }
555          }
556          if (!despool_data(dcr, false)) {
557             Jmsg(jcr, M_FATAL, 0, _("Fatal despooling error."));
558             jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
559             return false;
560          }
561          if (!write_spool_header(dcr)) {
562             return false;
563          }
564          continue;                    /* try again */
565       }
566       return true;
567    }
568    Jmsg(jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
569    jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
570    return false;
571 }
572
573 static bool close_data_spool_file(DCR *dcr)
574 {
575    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
576
577    P(mutex);
578    spool_stats.data_jobs--;
579    spool_stats.total_data_jobs++;
580    if (spool_stats.data_size < dcr->job_spool_size) {
581       spool_stats.data_size = 0;
582    } else {
583       spool_stats.data_size -= dcr->job_spool_size;
584    }
585    V(mutex);
586    P(dcr->dev->spool_mutex);
587    dcr->job_spool_size = 0;
588    V(dcr->dev->spool_mutex);
589
590    make_unique_data_spool_filename(dcr, &name);
591    close(dcr->spool_fd);
592    dcr->spool_fd = -1;
593    dcr->spooling = false;
594    unlink(name);
595    Dmsg1(100, "Deleted spool file: %s\n", name);
596    free_pool_memory(name);
597    return true;
598 }
599
600 bool are_attributes_spooled(JCR *jcr)
601 {
602    return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
603 }
604
605 /*
606  * Create spool file for attributes.
607  *  This is done by "attaching" to the bsock, and when
608  *  it is called, the output is written to a file.
609  *  The actual spooling is turned on and off in
610  *  append.c only during writing of the attributes.
611  */
612 bool begin_attribute_spool(JCR *jcr)
613 {
614    if (!jcr->no_attributes && jcr->spool_attributes) {
615       return open_attr_spool_file(jcr, jcr->dir_bsock);
616    }
617    return true;
618 }
619
620 static void update_attr_spool_size(ssize_t size)
621 {
622    P(mutex);
623    if (size > 0) {
624      if ((spool_stats.attr_size - size) > 0) {
625         spool_stats.attr_size -= size;
626      } else {
627         spool_stats.attr_size = 0;
628      }
629    }
630    V(mutex);
631 }
632
633 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
634 {
635    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
636       jcr->Job, fd);
637 }
638
639 /*
640  * Tell Director where to find the attributes spool file
641  *  Note, if we are not on the same machine, the Director will
642  *  return an error, and the higher level routine will transmit
643  *  the data record by record -- using bsock->despool().
644  */
645 static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
646 {
647    /* send full spool file name */
648    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
649    make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
650    bash_spaces(name);
651    jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
652    free_pool_memory(name);
653
654    if (jcr->dir_bsock->recv() <= 0) {
655       Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
656       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
657       return false;
658    }
659
660    if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
661       return false;
662    }
663    return true;
664 }
665
666 bool commit_attribute_spool(JCR *jcr)
667 {
668    boffset_t size, data_end;
669    char ec1[30];
670    char tbuf[100];
671    BSOCK *dir;
672
673    Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
674          (utime_t)time(NULL)));
675    if (are_attributes_spooled(jcr)) {
676       dir = jcr->dir_bsock;
677       if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
678          berrno be;
679          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
680               be.bstrerror());
681          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
682          goto bail_out;
683       }
684       size = ftello(dir->m_spool_fd);
685       /* For Incomplete Job truncate spool file to last valid data_end if necssary */
686       if (jcr->is_JobStatus(JS_Incomplete)) {
687          data_end = dir->get_last_data_end();
688          if (size > data_end) {
689             if (ftruncate(fileno(dir->m_spool_fd), data_end) != 0) {
690                berrno be;
691                Jmsg(jcr, M_FATAL, 0, _("Truncate on attributes file failed: ERR=%s\n"),
692                     be.bstrerror());
693                jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
694                goto bail_out;
695             }
696             Dmsg2(100, "=== Attrib spool truncated from %lld to %lld\n",
697                   size, data_end);
698             size = data_end;
699          }
700       }
701       if (size < 0) {
702          berrno be;
703          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
704               be.bstrerror());
705          jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
706          goto bail_out;
707       }
708       P(mutex);
709       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
710          spool_stats.max_attr_size = spool_stats.attr_size + size;
711       }
712       spool_stats.attr_size += size;
713       V(mutex);
714       jcr->sendJobStatus(JS_AttrDespooling);
715       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
716             edit_uint64_with_commas(size, ec1));
717
718       if (!blast_attr_spool_file(jcr, size)) {
719          /* Can't read spool file from director side,
720           * send content over network.
721           */
722          dir->despool(update_attr_spool_size, size);
723       }
724       return close_attr_spool_file(jcr, dir);
725    }
726    return true;
727
728 bail_out:
729    close_attr_spool_file(jcr, dir);
730    return false;
731 }
732
733 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
734 {
735    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
736
737    make_unique_spool_filename(jcr, &name, bs->m_fd);
738    bs->m_spool_fd = fopen(name, "w+b");
739    if (!bs->m_spool_fd) {
740       berrno be;
741       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
742            be.bstrerror());
743       jcr->forceJobStatus(JS_FatalError);  /* override any Incomplete */
744       free_pool_memory(name);
745       return false;
746    }
747    P(mutex);
748    spool_stats.attr_jobs++;
749    V(mutex);
750    free_pool_memory(name);
751    return true;
752 }
753
754 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
755 {
756    POOLMEM *name;
757
758    char tbuf[100];
759
760    Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
761          (utime_t)time(NULL)));
762    if (!bs->m_spool_fd) {
763       return true;
764    }
765    name = get_pool_memory(PM_MESSAGE);
766    P(mutex);
767    spool_stats.attr_jobs--;
768    spool_stats.total_attr_jobs++;
769    V(mutex);
770    make_unique_spool_filename(jcr, &name, bs->m_fd);
771    fclose(bs->m_spool_fd);
772    unlink(name);
773    free_pool_memory(name);
774    bs->m_spool_fd = NULL;
775    bs->clear_spooling();
776    return true;
777 }
778
779 bool discard_attribute_spool(JCR *jcr)
780 {
781    if (are_attributes_spooled(jcr)) {
782       return close_attr_spool_file(jcr, jcr->dir_bsock);
783    }
784    return true;
785 }