]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
- Remove Created new FileSet message as it always comes out in
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2004-2005 Kern Sibbald
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License as
13    published by the Free Software Foundation; either version 2 of
14    the License, or (at your option) any later version.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19    General Public License for more details.
20
21    You should have received a copy of the GNU General Public
22    License along with this program; if not, write to the Free
23    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
24    MA 02111-1307, USA.
25
26  */
27
28 #include "bacula.h"
29 #include "stored.h"
30
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
33 static bool open_data_spool_file(DCR *dcr);
34 static bool close_data_spool_file(DCR *dcr);
35 static bool despool_data(DCR *dcr, bool commit);
36 static int  read_block_from_spool_file(DCR *dcr);
37 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
38 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
39 static bool write_spool_header(DCR *dcr);
40 static bool write_spool_data(DCR *dcr);
41
42 struct spool_stats_t {
43    uint32_t data_jobs;                /* current jobs spooling data */
44    uint32_t attr_jobs;
45    uint32_t total_data_jobs;          /* total jobs to have spooled data */
46    uint32_t total_attr_jobs;
47    int64_t max_data_size;             /* max data size */
48    int64_t max_attr_size;
49    int64_t data_size;                 /* current data size (all jobs running) */
50    int64_t attr_size;
51 };
52
53 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
54 spool_stats_t spool_stats;
55
56 /*
57  * Header for data spool record */
58 struct spool_hdr {
59    int32_t  FirstIndex;               /* FirstIndex for buffer */
60    int32_t  LastIndex;                /* LastIndex for buffer */
61    uint32_t len;                      /* length of next buffer */
62 };
63
64 enum {
65    RB_EOT = 1,
66    RB_ERROR,
67    RB_OK
68 };
69
70 void list_spool_stats(BSOCK *bs)
71 {
72    char ed1[30], ed2[30];
73    if (spool_stats.data_jobs || spool_stats.max_data_size) {
74       bnet_fsend(bs, "Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n",
75          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
76          spool_stats.total_data_jobs,
77          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
78    }
79    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
80       bnet_fsend(bs, "Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n",
81          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
82          spool_stats.total_attr_jobs,
83          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
84    }
85 }
86
87 bool begin_data_spool(DCR *dcr)
88 {
89    bool stat = true;
90    if (dcr->jcr->spool_data) {
91       Dmsg0(100, "Turning on data spooling\n");
92       dcr->spool_data = true;
93       stat = open_data_spool_file(dcr);
94       if (stat) {
95          dcr->spooling = true;
96          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
97          P(mutex);
98          spool_stats.data_jobs++;
99          V(mutex);
100       }
101    }
102    return stat;
103 }
104
105 bool discard_data_spool(DCR *dcr)
106 {
107    if (dcr->spooling) {
108       Dmsg0(100, "Data spooling discarded\n");
109       return close_data_spool_file(dcr);
110    }
111    return true;
112 }
113
114 bool commit_data_spool(DCR *dcr)
115 {
116    bool stat;
117
118    if (dcr->spooling) {
119       Dmsg0(100, "Committing spooled data\n");
120       stat = despool_data(dcr, true /*commit*/);
121       if (!stat) {
122          Pmsg1(000, "Bad return from despool WroteVol=%d\n", dcr->WroteVol);
123          close_data_spool_file(dcr);
124          return false;
125       }
126       return close_data_spool_file(dcr);
127    }
128    return true;
129 }
130
131 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
132 {
133    const char *dir;
134    if (dcr->dev->device->spool_directory) {
135       dir = dcr->dev->device->spool_directory;
136    } else {
137       dir = working_directory;
138    }
139    Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, dcr->jcr->Job, 
140         dcr->device->hdr.name);
141 }
142
143
144 static bool open_data_spool_file(DCR *dcr)
145 {
146    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
147    int spool_fd;
148
149    make_unique_data_spool_filename(dcr, &name);
150    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
151       dcr->spool_fd = spool_fd;
152       dcr->jcr->spool_attributes = true;
153    } else {
154       berrno be;
155       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
156            be.strerror());
157       free_pool_memory(name);
158       return false;
159    }
160    Dmsg1(100, "Created spool file: %s\n", name);
161    free_pool_memory(name);
162    return true;
163 }
164
165 static bool close_data_spool_file(DCR *dcr)
166 {
167    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
168
169    P(mutex);
170    spool_stats.data_jobs--;
171    spool_stats.total_data_jobs++;
172    if (spool_stats.data_size < dcr->spool_size) {
173       spool_stats.data_size = 0;
174    } else {
175       spool_stats.data_size -= dcr->spool_size;
176    }
177    dcr->spool_size = 0;
178    V(mutex);
179
180    make_unique_data_spool_filename(dcr, &name);
181    close(dcr->spool_fd);
182    dcr->spool_fd = -1;
183    dcr->spooling = false;
184    unlink(name);
185    Dmsg1(100, "Deleted spool file: %s\n", name);
186    free_pool_memory(name);
187    return true;
188 }
189
190 static const char *spool_name = "*spool*";
191
192 static bool despool_data(DCR *dcr, bool commit)
193 {
194    DEVICE *rdev;
195    DCR *rdcr;
196    bool ok = true;
197    DEV_BLOCK *block;
198    JCR *jcr = dcr->jcr;
199    int stat;
200    char ec1[50];
201
202    Dmsg0(100, "Despooling data\n");
203    Jmsg(jcr, M_INFO, 0, _("%s spooled data to Volume. Despooling %s bytes ...\n"),
204         commit?"Committing":"Writing",
205         edit_uint64_with_commas(jcr->dcr->spool_size, ec1));
206    dcr->spooling = false;
207    lock_device(dcr->dev);
208    dcr->dev_locked = true;
209
210    /*
211     * This is really quite kludgy and should be fixed some time.
212     * We create a dev structure to read from the spool file
213     * in rdev and rdcr.
214     */
215    rdev = (DEVICE *)malloc(sizeof(DEVICE));
216    memset(rdev, 0, sizeof(DEVICE));
217    rdev->dev_name = get_memory(strlen(spool_name)+1);
218    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
219    rdev->errmsg = get_pool_memory(PM_EMSG);
220    *rdev->errmsg = 0;
221    rdev->max_block_size = dcr->dev->max_block_size;
222    rdev->min_block_size = dcr->dev->min_block_size;
223    rdev->device = dcr->dev->device;
224    rdcr = new_dcr(NULL, rdev);
225    rdcr->spool_fd = dcr->spool_fd;
226    rdcr->jcr = jcr;                   /* set a valid jcr */
227    block = dcr->block;                /* save block */
228    dcr->block = rdcr->block;          /* make read and write block the same */
229
230    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
231    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
232
233    for ( ; ok; ) {
234       if (job_canceled(jcr)) {
235          ok = false;
236          break;
237       }
238       stat = read_block_from_spool_file(rdcr);
239       if (stat == RB_EOT) {
240          break;
241       } else if (stat == RB_ERROR) {
242          ok = false;
243          break;
244       }
245       ok = write_block_to_device(dcr);
246       if (!ok) {
247          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
248                dcr->dev->print_name(), strerror_dev(dcr->dev));
249       }
250       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
251    }
252    dcr->block = block;                /* reset block */
253
254    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
255    if (ftruncate(rdcr->spool_fd, 0) != 0) {
256       berrno be;
257       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
258          be.strerror());
259       Pmsg1(000, "Bad return from ftruncate. ERR=%s\n", be.strerror());
260       ok = false;
261    }
262
263    P(mutex);
264    if (spool_stats.data_size < dcr->spool_size) {
265       spool_stats.data_size = 0;
266    } else {
267       spool_stats.data_size -= dcr->spool_size;
268    }
269    V(mutex);
270    P(dcr->dev->spool_mutex);
271    dcr->dev->spool_size -= dcr->spool_size;
272    dcr->spool_size = 0;               /* zap size in input dcr */
273    V(dcr->dev->spool_mutex);
274    free_memory(rdev->dev_name);
275    free_pool_memory(rdev->errmsg);
276    /* Be careful to NULL the jcr and free rdev after free_dcr() */
277    rdcr->jcr = NULL;
278    free_dcr(rdcr);
279    free(rdev);
280    unlock_device(dcr->dev);
281    dcr->dev_locked = false;
282    dcr->spooling = true;           /* turn on spooling again */
283    return ok;
284 }
285
286 /*
287  * Read a block from the spool file
288  *
289  *  Returns RB_OK on success
290  *          RB_EOT when file done
291  *          RB_ERROR on error
292  */
293 static int read_block_from_spool_file(DCR *dcr)
294 {
295    uint32_t rlen;
296    ssize_t stat;
297    spool_hdr hdr;
298    DEV_BLOCK *block = dcr->block;
299
300    rlen = sizeof(hdr);
301    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
302    if (stat == 0) {
303       Dmsg0(100, "EOT on spool read.\n");
304       return RB_EOT;
305    } else if (stat != (ssize_t)rlen) {
306       if (stat == -1) {
307          berrno be;
308          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
309               be.strerror());
310       } else {
311          Pmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
312          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
313       }
314       return RB_ERROR;
315    }
316    rlen = hdr.len;
317    if (rlen > block->buf_len) {
318       Pmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
319       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
320       return RB_ERROR;
321    }
322    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
323    if (stat != (ssize_t)rlen) {
324       Pmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
325       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
326       return RB_ERROR;
327    }
328    /* Setup write pointers */
329    block->binbuf = rlen;
330    block->bufp = block->buf + block->binbuf;
331    block->FirstIndex = hdr.FirstIndex;
332    block->LastIndex = hdr.LastIndex;
333    block->VolSessionId = dcr->jcr->VolSessionId;
334    block->VolSessionTime = dcr->jcr->VolSessionTime;
335    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
336    return RB_OK;
337 }
338
339 /*
340  * Write a block to the spool file
341  *
342  *  Returns: true on success or EOT
343  *           false on hard error
344  */
345 bool write_block_to_spool_file(DCR *dcr)
346 {
347    uint32_t wlen, hlen;               /* length to write */
348    bool despool = false;
349    DEV_BLOCK *block = dcr->block;
350
351    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
352    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
353       return true;
354    }
355
356    hlen = sizeof(spool_hdr);
357    wlen = block->binbuf;
358    P(dcr->dev->spool_mutex);
359    dcr->spool_size += hlen + wlen;
360    dcr->dev->spool_size += hlen + wlen;
361    if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
362        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
363       despool = true;
364    }
365    V(dcr->dev->spool_mutex);
366    P(mutex);
367    spool_stats.data_size += hlen + wlen;
368    if (spool_stats.data_size > spool_stats.max_data_size) {
369       spool_stats.max_data_size = spool_stats.data_size;
370    }
371    V(mutex);
372    if (despool) {
373 #ifdef xDEBUG
374       char ec1[30], ec2[30], ec3[30], ec4[30];
375       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
376             "max_job_size=%s job_size=%s\n",
377             edit_uint64_with_commas(dcr->max_spool_size, ec1),
378             edit_uint64_with_commas(dcr->spool_size, ec2),
379             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
380             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
381 #endif
382       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
383       if (!despool_data(dcr, false)) {
384          Pmsg0(000, "Bad return from despool in write_block.\n");
385          return false;
386       }
387       /* Despooling cleared these variables so reset them */
388       P(dcr->dev->spool_mutex);
389       dcr->spool_size += hlen + wlen;
390       dcr->dev->spool_size += hlen + wlen;
391       V(dcr->dev->spool_mutex);
392       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
393    }
394
395
396    if (!write_spool_header(dcr)) {
397       return false;
398    }
399    if (!write_spool_data(dcr)) {
400      return false;
401    }
402
403    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
404    empty_block(block);
405    return true;
406 }
407
408 static bool write_spool_header(DCR *dcr)
409 {
410    spool_hdr hdr;
411    ssize_t stat;
412    DEV_BLOCK *block = dcr->block;
413
414    hdr.FirstIndex = block->FirstIndex;
415    hdr.LastIndex = block->LastIndex;
416    hdr.len = block->binbuf;
417
418    /* Write header */
419    for (int retry=0; retry<=1; retry++) {
420       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
421       if (stat == -1) {
422          berrno be;
423          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
424               be.strerror());
425       }
426       if (stat != (ssize_t)sizeof(hdr)) {
427          /* If we wrote something, truncate it, then despool */
428          if (stat != -1) {
429             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
430                berrno be;
431                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
432                   be.strerror());
433                return false;
434             }
435          }
436          if (!despool_data(dcr, false)) {
437             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
438             return false;
439          }
440          continue;                    /* try again */
441       }
442       return true;
443    }
444    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
445    return false;
446 }
447
448 static bool write_spool_data(DCR *dcr)
449 {
450    ssize_t stat;
451    DEV_BLOCK *block = dcr->block;
452
453    /* Write data */
454    for (int retry=0; retry<=1; retry++) {
455       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
456       if (stat == -1) {
457          berrno be;
458          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
459               be.strerror());
460       }
461       if (stat != (ssize_t)block->binbuf) {
462          /*
463           * If we wrote something, truncate it and the header, then despool
464           */
465          if (stat != -1) {
466             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
467                       - stat - sizeof(spool_hdr)) != 0) {
468                berrno be;
469                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
470                   be.strerror());
471                return false;
472             }
473          }
474          if (!despool_data(dcr, false)) {
475             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
476             return false;
477          }
478          if (!write_spool_header(dcr)) {
479             return false;
480          }
481          continue;                    /* try again */
482       }
483       return true;
484    }
485    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
486    return false;
487 }
488
489
490
491 bool are_attributes_spooled(JCR *jcr)
492 {
493    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
494 }
495
496 /*
497  * Create spool file for attributes.
498  *  This is done by "attaching" to the bsock, and when
499  *  it is called, the output is written to a file.
500  *  The actual spooling is turned on and off in
501  *  append.c only during writing of the attributes.
502  */
503 bool begin_attribute_spool(JCR *jcr)
504 {
505    if (!jcr->no_attributes && jcr->spool_attributes) {
506       return open_attr_spool_file(jcr, jcr->dir_bsock);
507    }
508    return true;
509 }
510
511 bool discard_attribute_spool(JCR *jcr)
512 {
513    if (are_attributes_spooled(jcr)) {
514       return close_attr_spool_file(jcr, jcr->dir_bsock);
515    }
516    return true;
517 }
518
519 static void update_attr_spool_size(ssize_t size)
520 {
521    P(mutex);
522    if (size > 0) {
523      if ((spool_stats.attr_size - size) > 0) {
524         spool_stats.attr_size -= size;
525      } else {
526         spool_stats.attr_size = 0;
527      }
528    }
529    V(mutex);
530 }
531
532 bool commit_attribute_spool(JCR *jcr)
533 {
534    ssize_t size;
535    char ec1[30];
536
537    if (are_attributes_spooled(jcr)) {
538       if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
539          berrno be;
540          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
541               be.strerror());
542          goto bail_out;
543       }
544       size = ftello(jcr->dir_bsock->spool_fd);
545       if (size < 0) {
546          berrno be;
547          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
548               be.strerror());
549          goto bail_out;
550       }
551       P(mutex);
552       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
553          spool_stats.max_attr_size = spool_stats.attr_size + size;
554       }
555       spool_stats.attr_size += size;
556       V(mutex);
557       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
558             edit_uint64_with_commas(size, ec1));
559       bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
560       return close_attr_spool_file(jcr, jcr->dir_bsock);
561    }
562    return true;
563
564 bail_out:
565    close_attr_spool_file(jcr, jcr->dir_bsock);
566    return false;
567 }
568
569 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
570 {
571    Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
572       jcr->Job, fd);
573 }
574
575
576 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
577 {
578    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
579
580    make_unique_spool_filename(jcr, &name, bs->fd);
581    bs->spool_fd = fopen(name, "w+");
582    if (!bs->spool_fd) {
583       berrno be;
584       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
585            be.strerror());
586       free_pool_memory(name);
587       return false;
588    }
589    P(mutex);
590    spool_stats.attr_jobs++;
591    V(mutex);
592    free_pool_memory(name);
593    return true;
594 }
595
596 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
597 {
598    POOLMEM *name;
599
600    if (!bs->spool_fd) {
601       return true;
602    }
603    name = get_pool_memory(PM_MESSAGE);
604    P(mutex);
605    spool_stats.attr_jobs--;
606    spool_stats.total_attr_jobs++;
607    V(mutex);
608    make_unique_spool_filename(jcr, &name, bs->fd);
609    fclose(bs->spool_fd);
610    unlink(name);
611    free_pool_memory(name);
612    bs->spool_fd = NULL;
613    bs->spool = false;
614    return true;
615 }