]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
- Put Dmsg() on inside if() to avoid calling subroutine.
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2004-2005 Kern Sibbald
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License
13    version 2 as ammended with additional clauses defined in the
14    file LICENSE in the main source directory.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
19    the file LICENSE for additional details.
20
21  */
22
23 #include "bacula.h"
24 #include "stored.h"
25
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int  read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
36
37 struct spool_stats_t {
38    uint32_t data_jobs;                /* current jobs spooling data */
39    uint32_t attr_jobs;
40    uint32_t total_data_jobs;          /* total jobs to have spooled data */
41    uint32_t total_attr_jobs;
42    int64_t max_data_size;             /* max data size */
43    int64_t max_attr_size;
44    int64_t data_size;                 /* current data size (all jobs running) */
45    int64_t attr_size;
46 };
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
50
51 /*
52  * Header for data spool record */
53 struct spool_hdr {
54    int32_t  FirstIndex;               /* FirstIndex for buffer */
55    int32_t  LastIndex;                /* LastIndex for buffer */
56    uint32_t len;                      /* length of next buffer */
57 };
58
59 enum {
60    RB_EOT = 1,
61    RB_ERROR,
62    RB_OK
63 };
64
65 void list_spool_stats(BSOCK *bs)
66 {
67    char ed1[30], ed2[30];
68    if (spool_stats.data_jobs || spool_stats.max_data_size) {
69       bnet_fsend(bs, "Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n",
70          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71          spool_stats.total_data_jobs,
72          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
73    }
74    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75       bnet_fsend(bs, "Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n",
76          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77          spool_stats.total_attr_jobs,
78          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
79    }
80 }
81
82 bool begin_data_spool(DCR *dcr)
83 {
84    bool stat = true;
85    if (dcr->jcr->spool_data) {
86       Dmsg0(100, "Turning on data spooling\n");
87       dcr->spool_data = true;
88       stat = open_data_spool_file(dcr);
89       if (stat) {
90          dcr->spooling = true;
91          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
92          P(mutex);
93          spool_stats.data_jobs++;
94          V(mutex);
95       }
96    }
97    return stat;
98 }
99
100 bool discard_data_spool(DCR *dcr)
101 {
102    if (dcr->spooling) {
103       Dmsg0(100, "Data spooling discarded\n");
104       return close_data_spool_file(dcr);
105    }
106    return true;
107 }
108
109 bool commit_data_spool(DCR *dcr)
110 {
111    bool stat;
112
113    if (dcr->spooling) {
114       Dmsg0(100, "Committing spooled data\n");
115       stat = despool_data(dcr, true /*commit*/);
116       if (!stat) {
117          Pmsg1(000, "Bad return from despool WroteVol=%d\n", dcr->WroteVol);
118          close_data_spool_file(dcr);
119          return false;
120       }
121       return close_data_spool_file(dcr);
122    }
123    return true;
124 }
125
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
127 {
128    const char *dir;
129    if (dcr->dev->device->spool_directory) {
130       dir = dcr->dev->device->spool_directory;
131    } else {
132       dir = working_directory;
133    }
134    Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, dcr->jcr->Job, 
135         dcr->device->hdr.name);
136 }
137
138
139 static bool open_data_spool_file(DCR *dcr)
140 {
141    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
142    int spool_fd;
143
144    make_unique_data_spool_filename(dcr, &name);
145    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146       dcr->spool_fd = spool_fd;
147       dcr->jcr->spool_attributes = true;
148    } else {
149       berrno be;
150       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
151            be.strerror());
152       free_pool_memory(name);
153       return false;
154    }
155    Dmsg1(100, "Created spool file: %s\n", name);
156    free_pool_memory(name);
157    return true;
158 }
159
160 static bool close_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163
164    P(mutex);
165    spool_stats.data_jobs--;
166    spool_stats.total_data_jobs++;
167    if (spool_stats.data_size < dcr->spool_size) {
168       spool_stats.data_size = 0;
169    } else {
170       spool_stats.data_size -= dcr->spool_size;
171    }
172    dcr->spool_size = 0;
173    V(mutex);
174
175    make_unique_data_spool_filename(dcr, &name);
176    close(dcr->spool_fd);
177    dcr->spool_fd = -1;
178    dcr->spooling = false;
179    unlink(name);
180    Dmsg1(100, "Deleted spool file: %s\n", name);
181    free_pool_memory(name);
182    return true;
183 }
184
185 static const char *spool_name = "*spool*";
186
187 static bool despool_data(DCR *dcr, bool commit)
188 {
189    DEVICE *rdev;
190    DCR *rdcr;
191    bool ok = true;
192    DEV_BLOCK *block;
193    JCR *jcr = dcr->jcr;
194    int stat;
195    char ec1[50];
196
197    Dmsg0(100, "Despooling data\n");
198    Jmsg(jcr, M_INFO, 0, _("%s spooled data to Volume. Despooling %s bytes ...\n"),
199         commit?"Committing":"Writing",
200         edit_uint64_with_commas(jcr->dcr->spool_size, ec1));
201    dcr->spooling = false;
202    lock_device(dcr->dev);
203    dcr->dev_locked = true;
204
205    /*
206     * This is really quite kludgy and should be fixed some time.
207     * We create a dev structure to read from the spool file
208     * in rdev and rdcr.
209     */
210    rdev = (DEVICE *)malloc(sizeof(DEVICE));
211    memset(rdev, 0, sizeof(DEVICE));
212    rdev->dev_name = get_memory(strlen(spool_name)+1);
213    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
214    rdev->errmsg = get_pool_memory(PM_EMSG);
215    *rdev->errmsg = 0;
216    rdev->max_block_size = dcr->dev->max_block_size;
217    rdev->min_block_size = dcr->dev->min_block_size;
218    rdev->device = dcr->dev->device;
219    rdcr = new_dcr(NULL, rdev);
220    rdcr->spool_fd = dcr->spool_fd;
221    rdcr->jcr = jcr;                   /* set a valid jcr */
222    block = dcr->block;                /* save block */
223    dcr->block = rdcr->block;          /* make read and write block the same */
224
225    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
226    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
227
228    for ( ; ok; ) {
229       if (job_canceled(jcr)) {
230          ok = false;
231          break;
232       }
233       stat = read_block_from_spool_file(rdcr);
234       if (stat == RB_EOT) {
235          break;
236       } else if (stat == RB_ERROR) {
237          ok = false;
238          break;
239       }
240       ok = write_block_to_device(dcr);
241       if (!ok) {
242          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
243                dcr->dev->print_name(), strerror_dev(dcr->dev));
244       }
245       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
246    }
247    dcr->block = block;                /* reset block */
248
249    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
250    if (ftruncate(rdcr->spool_fd, 0) != 0) {
251       berrno be;
252       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
253          be.strerror());
254       Pmsg1(000, "Bad return from ftruncate. ERR=%s\n", be.strerror());
255       ok = false;
256    }
257
258    P(mutex);
259    if (spool_stats.data_size < dcr->spool_size) {
260       spool_stats.data_size = 0;
261    } else {
262       spool_stats.data_size -= dcr->spool_size;
263    }
264    V(mutex);
265    P(dcr->dev->spool_mutex);
266    dcr->dev->spool_size -= dcr->spool_size;
267    dcr->spool_size = 0;               /* zap size in input dcr */
268    V(dcr->dev->spool_mutex);
269    free_memory(rdev->dev_name);
270    free_pool_memory(rdev->errmsg);
271    /* Be careful to NULL the jcr and free rdev after free_dcr() */
272    rdcr->jcr = NULL;
273    free_dcr(rdcr);
274    free(rdev);
275    unlock_device(dcr->dev);
276    dcr->dev_locked = false;
277    dcr->spooling = true;           /* turn on spooling again */
278    return ok;
279 }
280
281 /*
282  * Read a block from the spool file
283  *
284  *  Returns RB_OK on success
285  *          RB_EOT when file done
286  *          RB_ERROR on error
287  */
288 static int read_block_from_spool_file(DCR *dcr)
289 {
290    uint32_t rlen;
291    ssize_t stat;
292    spool_hdr hdr;
293    DEV_BLOCK *block = dcr->block;
294
295    rlen = sizeof(hdr);
296    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
297    if (stat == 0) {
298       Dmsg0(100, "EOT on spool read.\n");
299       return RB_EOT;
300    } else if (stat != (ssize_t)rlen) {
301       if (stat == -1) {
302          berrno be;
303          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
304               be.strerror());
305       } else {
306          Pmsg2(000, "Spool read error. Wanted %u bytes, got %d\n", rlen, stat);
307          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
308       }
309       return RB_ERROR;
310    }
311    rlen = hdr.len;
312    if (rlen > block->buf_len) {
313       Pmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
314       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
315       return RB_ERROR;
316    }
317    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
318    if (stat != (ssize_t)rlen) {
319       Pmsg2(000, "Spool data read error. Wanted %u bytes, got %d\n", rlen, stat);
320       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
321       return RB_ERROR;
322    }
323    /* Setup write pointers */
324    block->binbuf = rlen;
325    block->bufp = block->buf + block->binbuf;
326    block->FirstIndex = hdr.FirstIndex;
327    block->LastIndex = hdr.LastIndex;
328    block->VolSessionId = dcr->jcr->VolSessionId;
329    block->VolSessionTime = dcr->jcr->VolSessionTime;
330    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
331    return RB_OK;
332 }
333
334 /*
335  * Write a block to the spool file
336  *
337  *  Returns: true on success or EOT
338  *           false on hard error
339  */
340 bool write_block_to_spool_file(DCR *dcr)
341 {
342    uint32_t wlen, hlen;               /* length to write */
343    bool despool = false;
344    DEV_BLOCK *block = dcr->block;
345
346    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
347    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
348       return true;
349    }
350
351    hlen = sizeof(spool_hdr);
352    wlen = block->binbuf;
353    P(dcr->dev->spool_mutex);
354    dcr->spool_size += hlen + wlen;
355    dcr->dev->spool_size += hlen + wlen;
356    if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
357        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
358       despool = true;
359    }
360    V(dcr->dev->spool_mutex);
361    P(mutex);
362    spool_stats.data_size += hlen + wlen;
363    if (spool_stats.data_size > spool_stats.max_data_size) {
364       spool_stats.max_data_size = spool_stats.data_size;
365    }
366    V(mutex);
367    if (despool) {
368 #ifdef xDEBUG
369       char ec1[30], ec2[30], ec3[30], ec4[30];
370       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
371             "max_job_size=%s job_size=%s\n",
372             edit_uint64_with_commas(dcr->max_spool_size, ec1),
373             edit_uint64_with_commas(dcr->spool_size, ec2),
374             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
375             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
376 #endif
377       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
378       if (!despool_data(dcr, false)) {
379          Pmsg0(000, "Bad return from despool in write_block.\n");
380          return false;
381       }
382       /* Despooling cleared these variables so reset them */
383       P(dcr->dev->spool_mutex);
384       dcr->spool_size += hlen + wlen;
385       dcr->dev->spool_size += hlen + wlen;
386       V(dcr->dev->spool_mutex);
387       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
388    }
389
390
391    if (!write_spool_header(dcr)) {
392       return false;
393    }
394    if (!write_spool_data(dcr)) {
395      return false;
396    }
397
398    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
399    empty_block(block);
400    return true;
401 }
402
403 static bool write_spool_header(DCR *dcr)
404 {
405    spool_hdr hdr;
406    ssize_t stat;
407    DEV_BLOCK *block = dcr->block;
408
409    hdr.FirstIndex = block->FirstIndex;
410    hdr.LastIndex = block->LastIndex;
411    hdr.len = block->binbuf;
412
413    /* Write header */
414    for (int retry=0; retry<=1; retry++) {
415       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
416       if (stat == -1) {
417          berrno be;
418          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
419               be.strerror());
420       }
421       if (stat != (ssize_t)sizeof(hdr)) {
422          /* If we wrote something, truncate it, then despool */
423          if (stat != -1) {
424             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
425                berrno be;
426                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
427                   be.strerror());
428                return false;
429             }
430          }
431          if (!despool_data(dcr, false)) {
432             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
433             return false;
434          }
435          continue;                    /* try again */
436       }
437       return true;
438    }
439    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
440    return false;
441 }
442
443 static bool write_spool_data(DCR *dcr)
444 {
445    ssize_t stat;
446    DEV_BLOCK *block = dcr->block;
447
448    /* Write data */
449    for (int retry=0; retry<=1; retry++) {
450       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
451       if (stat == -1) {
452          berrno be;
453          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
454               be.strerror());
455       }
456       if (stat != (ssize_t)block->binbuf) {
457          /*
458           * If we wrote something, truncate it and the header, then despool
459           */
460          if (stat != -1) {
461             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
462                       - stat - sizeof(spool_hdr)) != 0) {
463                berrno be;
464                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
465                   be.strerror());
466                return false;
467             }
468          }
469          if (!despool_data(dcr, false)) {
470             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
471             return false;
472          }
473          if (!write_spool_header(dcr)) {
474             return false;
475          }
476          continue;                    /* try again */
477       }
478       return true;
479    }
480    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
481    return false;
482 }
483
484
485
486 bool are_attributes_spooled(JCR *jcr)
487 {
488    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
489 }
490
491 /*
492  * Create spool file for attributes.
493  *  This is done by "attaching" to the bsock, and when
494  *  it is called, the output is written to a file.
495  *  The actual spooling is turned on and off in
496  *  append.c only during writing of the attributes.
497  */
498 bool begin_attribute_spool(JCR *jcr)
499 {
500    if (!jcr->no_attributes && jcr->spool_attributes) {
501       return open_attr_spool_file(jcr, jcr->dir_bsock);
502    }
503    return true;
504 }
505
506 bool discard_attribute_spool(JCR *jcr)
507 {
508    if (are_attributes_spooled(jcr)) {
509       return close_attr_spool_file(jcr, jcr->dir_bsock);
510    }
511    return true;
512 }
513
514 static void update_attr_spool_size(ssize_t size)
515 {
516    P(mutex);
517    if (size > 0) {
518      if ((spool_stats.attr_size - size) > 0) {
519         spool_stats.attr_size -= size;
520      } else {
521         spool_stats.attr_size = 0;
522      }
523    }
524    V(mutex);
525 }
526
527 bool commit_attribute_spool(JCR *jcr)
528 {
529    off_t size;
530    char ec1[30];
531
532    if (are_attributes_spooled(jcr)) {
533       if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
534          berrno be;
535          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
536               be.strerror());
537          goto bail_out;
538       }
539       size = ftello(jcr->dir_bsock->spool_fd);
540       if (size < 0) {
541          berrno be;
542          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
543               be.strerror());
544          goto bail_out;
545       }
546       P(mutex);
547       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
548          spool_stats.max_attr_size = spool_stats.attr_size + size;
549       }
550       spool_stats.attr_size += size;
551       V(mutex);
552       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
553             edit_uint64_with_commas(size, ec1));
554       bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
555       return close_attr_spool_file(jcr, jcr->dir_bsock);
556    }
557    return true;
558
559 bail_out:
560    close_attr_spool_file(jcr, jcr->dir_bsock);
561    return false;
562 }
563
564 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
565 {
566    Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
567       jcr->Job, fd);
568 }
569
570
571 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
572 {
573    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
574
575    make_unique_spool_filename(jcr, &name, bs->fd);
576    bs->spool_fd = fopen(name, "w+");
577    if (!bs->spool_fd) {
578       berrno be;
579       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
580            be.strerror());
581       free_pool_memory(name);
582       return false;
583    }
584    P(mutex);
585    spool_stats.attr_jobs++;
586    V(mutex);
587    free_pool_memory(name);
588    return true;
589 }
590
591 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
592 {
593    POOLMEM *name;
594
595    if (!bs->spool_fd) {
596       return true;
597    }
598    name = get_pool_memory(PM_MESSAGE);
599    P(mutex);
600    spool_stats.attr_jobs--;
601    spool_stats.total_attr_jobs++;
602    V(mutex);
603    make_unique_spool_filename(jcr, &name, bs->fd);
604    fclose(bs->spool_fd);
605    unlink(name);
606    free_pool_memory(name);
607    bs->spool_fd = NULL;
608    bs->spool = false;
609    return true;
610 }