]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
- Final tweaks to build Win32.
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2004-2005 Kern Sibbald
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License
13    version 2 as amended with additional clauses defined in the
14    file LICENSE in the main source directory.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
19    the file LICENSE for additional details.
20
21  */
22
23 #include "bacula.h"
24 #include "stored.h"
25
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int  read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
36
37 struct spool_stats_t {
38    uint32_t data_jobs;                /* current jobs spooling data */
39    uint32_t attr_jobs;
40    uint32_t total_data_jobs;          /* total jobs to have spooled data */
41    uint32_t total_attr_jobs;
42    int64_t max_data_size;             /* max data size */
43    int64_t max_attr_size;
44    int64_t data_size;                 /* current data size (all jobs running) */
45    int64_t attr_size;
46 };
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
50
51 /*
52  * Header for data spool record */
53 struct spool_hdr {
54    int32_t  FirstIndex;               /* FirstIndex for buffer */
55    int32_t  LastIndex;                /* LastIndex for buffer */
56    uint32_t len;                      /* length of next buffer */
57 };
58
59 enum {
60    RB_EOT = 1,
61    RB_ERROR,
62    RB_OK
63 };
64
65 void list_spool_stats(BSOCK *bs)
66 {
67    char ed1[30], ed2[30];
68    if (spool_stats.data_jobs || spool_stats.max_data_size) {
69       bnet_fsend(bs, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
70          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71          spool_stats.total_data_jobs,
72          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
73    }
74    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75       bnet_fsend(bs, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
76          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77          spool_stats.total_attr_jobs,
78          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
79    }
80 }
81
82 bool begin_data_spool(DCR *dcr)
83 {
84    bool stat = true;
85    if (dcr->jcr->spool_data) {
86       Dmsg0(100, "Turning on data spooling\n");
87       dcr->spool_data = true;
88       stat = open_data_spool_file(dcr);
89       if (stat) {
90          dcr->spooling = true;
91          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
92          P(mutex);
93          spool_stats.data_jobs++;
94          V(mutex);
95       }
96    }
97    return stat;
98 }
99
100 bool discard_data_spool(DCR *dcr)
101 {
102    if (dcr->spooling) {
103       Dmsg0(100, "Data spooling discarded\n");
104       return close_data_spool_file(dcr);
105    }
106    return true;
107 }
108
109 bool commit_data_spool(DCR *dcr)
110 {
111    bool stat;
112
113    if (dcr->spooling) {
114       Dmsg0(100, "Committing spooled data\n");
115       stat = despool_data(dcr, true /*commit*/);
116       if (!stat) {
117          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
118          close_data_spool_file(dcr);
119          return false;
120       }
121       return close_data_spool_file(dcr);
122    }
123    return true;
124 }
125
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
127 {
128    const char *dir;
129    if (dcr->dev->device->spool_directory) {
130       dir = dcr->dev->device->spool_directory;
131    } else {
132       dir = working_directory;
133    }
134    Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job, 
135         dcr->device->hdr.name);
136 }
137
138
139 static bool open_data_spool_file(DCR *dcr)
140 {
141    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
142    int spool_fd;
143
144    make_unique_data_spool_filename(dcr, &name);
145    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146       dcr->spool_fd = spool_fd;
147       dcr->jcr->spool_attributes = true;
148    } else {
149       berrno be;
150       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
151            be.strerror());
152       free_pool_memory(name);
153       return false;
154    }
155    Dmsg1(100, "Created spool file: %s\n", name);
156    free_pool_memory(name);
157    return true;
158 }
159
160 static bool close_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163
164    P(mutex);
165    spool_stats.data_jobs--;
166    spool_stats.total_data_jobs++;
167    if (spool_stats.data_size < dcr->job_spool_size) {
168       spool_stats.data_size = 0;
169    } else {
170       spool_stats.data_size -= dcr->job_spool_size;
171    }
172    dcr->job_spool_size = 0;
173    V(mutex);
174
175    make_unique_data_spool_filename(dcr, &name);
176    close(dcr->spool_fd);
177    dcr->spool_fd = -1;
178    dcr->spooling = false;
179    unlink(name);
180    Dmsg1(100, "Deleted spool file: %s\n", name);
181    free_pool_memory(name);
182    return true;
183 }
184
185 static const char *spool_name = "*spool*";
186
187 static bool despool_data(DCR *dcr, bool commit)
188 {
189    DEVICE *rdev;
190    DCR *rdcr;
191    bool ok = true;
192    DEV_BLOCK *block;
193    JCR *jcr = dcr->jcr;
194    int stat;
195    char ec1[50];
196
197    Dmsg0(100, "Despooling data\n");
198    if (commit) {
199       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume. Despooling %s bytes ...\n"),
200          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
201    }
202    else {
203       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
204          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
205    }
206    dcr->spooling = false;
207    lock_device(dcr->dev);
208    dcr->dev_locked = true;
209
210    /*
211     * This is really quite kludgy and should be fixed some time.
212     * We create a dev structure to read from the spool file
213     * in rdev and rdcr.
214     */
215    rdev = (DEVICE *)malloc(sizeof(DEVICE));
216    memset(rdev, 0, sizeof(DEVICE));
217    rdev->dev_name = get_memory(strlen(spool_name)+1);
218    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
219    rdev->errmsg = get_pool_memory(PM_EMSG);
220    *rdev->errmsg = 0;
221    rdev->max_block_size = dcr->dev->max_block_size;
222    rdev->min_block_size = dcr->dev->min_block_size;
223    rdev->device = dcr->dev->device;
224    rdcr = new_dcr(NULL, rdev);
225    rdcr->spool_fd = dcr->spool_fd;
226    rdcr->jcr = jcr;                   /* set a valid jcr */
227    block = dcr->block;                /* save block */
228    dcr->block = rdcr->block;          /* make read and write block the same */
229
230    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
231    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
232
233    for ( ; ok; ) {
234       if (job_canceled(jcr)) {
235          ok = false;
236          break;
237       }
238       stat = read_block_from_spool_file(rdcr);
239       if (stat == RB_EOT) {
240          break;
241       } else if (stat == RB_ERROR) {
242          ok = false;
243          break;
244       }
245       ok = write_block_to_device(dcr);
246       if (!ok) {
247          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
248                dcr->dev->print_name(), strerror_dev(dcr->dev));
249       }
250       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
251    }
252    dcr->block = block;                /* reset block */
253
254    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
255    if (ftruncate(rdcr->spool_fd, 0) != 0) {
256       berrno be;
257       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
258          be.strerror());
259       Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
260       ok = false;
261    }
262
263    P(mutex);
264    if (spool_stats.data_size < dcr->job_spool_size) {
265       spool_stats.data_size = 0;
266    } else {
267       spool_stats.data_size -= dcr->job_spool_size;
268    }
269    V(mutex);
270    P(dcr->dev->spool_mutex);
271    dcr->dev->spool_size -= dcr->job_spool_size;
272    dcr->job_spool_size = 0;            /* zap size in input dcr */
273    V(dcr->dev->spool_mutex);
274    free_memory(rdev->dev_name);
275    free_pool_memory(rdev->errmsg);
276    /* Be careful to NULL the jcr and free rdev after free_dcr() */
277    rdcr->jcr = NULL;
278    free_dcr(rdcr);
279    free(rdev);
280    unlock_device(dcr->dev);
281    dcr->dev_locked = false;
282    dcr->spooling = true;           /* turn on spooling again */
283    return ok;
284 }
285
286 /*
287  * Read a block from the spool file
288  *
289  *  Returns RB_OK on success
290  *          RB_EOT when file done
291  *          RB_ERROR on error
292  */
293 static int read_block_from_spool_file(DCR *dcr)
294 {
295    uint32_t rlen;
296    ssize_t stat;
297    spool_hdr hdr;
298    DEV_BLOCK *block = dcr->block;
299
300    rlen = sizeof(hdr);
301    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
302    if (stat == 0) {
303       Dmsg0(100, "EOT on spool read.\n");
304       return RB_EOT;
305    } else if (stat != (ssize_t)rlen) {
306       if (stat == -1) {
307          berrno be;
308          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
309               be.strerror());
310       } else {
311          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
312          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
313       }
314       return RB_ERROR;
315    }
316    rlen = hdr.len;
317    if (rlen > block->buf_len) {
318       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
319       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
320       return RB_ERROR;
321    }
322    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
323    if (stat != (ssize_t)rlen) {
324       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
325       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
326       return RB_ERROR;
327    }
328    /* Setup write pointers */
329    block->binbuf = rlen;
330    block->bufp = block->buf + block->binbuf;
331    block->FirstIndex = hdr.FirstIndex;
332    block->LastIndex = hdr.LastIndex;
333    block->VolSessionId = dcr->jcr->VolSessionId;
334    block->VolSessionTime = dcr->jcr->VolSessionTime;
335    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
336    return RB_OK;
337 }
338
339 /*
340  * Write a block to the spool file
341  *
342  *  Returns: true on success or EOT
343  *           false on hard error
344  */
345 bool write_block_to_spool_file(DCR *dcr)
346 {
347    uint32_t wlen, hlen;               /* length to write */
348    bool despool = false;
349    DEV_BLOCK *block = dcr->block;
350
351    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
352    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
353       return true;
354    }
355
356    hlen = sizeof(spool_hdr);
357    wlen = block->binbuf;
358    P(dcr->dev->spool_mutex);
359    dcr->job_spool_size += hlen + wlen;
360    dcr->dev->spool_size += hlen + wlen;
361    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
362        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
363       despool = true;
364    }
365    V(dcr->dev->spool_mutex);
366    P(mutex);
367    spool_stats.data_size += hlen + wlen;
368    if (spool_stats.data_size > spool_stats.max_data_size) {
369       spool_stats.max_data_size = spool_stats.data_size;
370    }
371    V(mutex);
372    if (despool) {
373 #ifdef xDEBUG
374       char ec1[30], ec2[30], ec3[30], ec4[30];
375       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
376             "max_job_size=%s job_size=%s\n",
377             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
378             edit_uint64_with_commas(dcr->job_spool_size, ec2),
379             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
380             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
381 #endif
382       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
383       if (!despool_data(dcr, false)) {
384          Pmsg0(000, _("Bad return from despool in write_block.\n"));
385          return false;
386       }
387       /* Despooling cleared these variables so reset them */
388       P(dcr->dev->spool_mutex);
389       dcr->job_spool_size += hlen + wlen;
390       dcr->dev->spool_size += hlen + wlen;
391       V(dcr->dev->spool_mutex);
392       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
393    }
394
395
396    if (!write_spool_header(dcr)) {
397       return false;
398    }
399    if (!write_spool_data(dcr)) {
400      return false;
401    }
402
403    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
404    empty_block(block);
405    return true;
406 }
407
408 static bool write_spool_header(DCR *dcr)
409 {
410    spool_hdr hdr;
411    ssize_t stat;
412    DEV_BLOCK *block = dcr->block;
413
414    hdr.FirstIndex = block->FirstIndex;
415    hdr.LastIndex = block->LastIndex;
416    hdr.len = block->binbuf;
417
418    /* Write header */
419    for (int retry=0; retry<=1; retry++) {
420       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
421       if (stat == -1) {
422          berrno be;
423          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
424               be.strerror());
425       }
426       if (stat != (ssize_t)sizeof(hdr)) {
427          /* If we wrote something, truncate it, then despool */
428          if (stat != -1) {
429             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
430                berrno be;
431                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
432                   be.strerror());
433                return false;
434             }
435          }
436          if (!despool_data(dcr, false)) {
437             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
438             return false;
439          }
440          continue;                    /* try again */
441       }
442       return true;
443    }
444    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
445    return false;
446 }
447
448 static bool write_spool_data(DCR *dcr)
449 {
450    ssize_t stat;
451    DEV_BLOCK *block = dcr->block;
452
453    /* Write data */
454    for (int retry=0; retry<=1; retry++) {
455       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
456       if (stat == -1) {
457          berrno be;
458          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
459               be.strerror());
460       }
461       if (stat != (ssize_t)block->binbuf) {
462          /*
463           * If we wrote something, truncate it and the header, then despool
464           */
465          if (stat != -1) {
466             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
467                       - stat - sizeof(spool_hdr)) != 0) {
468                berrno be;
469                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
470                   be.strerror());
471                return false;
472             }
473          }
474          if (!despool_data(dcr, false)) {
475             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
476             return false;
477          }
478          if (!write_spool_header(dcr)) {
479             return false;
480          }
481          continue;                    /* try again */
482       }
483       return true;
484    }
485    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
486    return false;
487 }
488
489
490
491 bool are_attributes_spooled(JCR *jcr)
492 {
493    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
494 }
495
496 /*
497  * Create spool file for attributes.
498  *  This is done by "attaching" to the bsock, and when
499  *  it is called, the output is written to a file.
500  *  The actual spooling is turned on and off in
501  *  append.c only during writing of the attributes.
502  */
503 bool begin_attribute_spool(JCR *jcr)
504 {
505    if (!jcr->no_attributes && jcr->spool_attributes) {
506       return open_attr_spool_file(jcr, jcr->dir_bsock);
507    }
508    return true;
509 }
510
511 bool discard_attribute_spool(JCR *jcr)
512 {
513    if (are_attributes_spooled(jcr)) {
514       return close_attr_spool_file(jcr, jcr->dir_bsock);
515    }
516    return true;
517 }
518
519 static void update_attr_spool_size(ssize_t size)
520 {
521    P(mutex);
522    if (size > 0) {
523      if ((spool_stats.attr_size - size) > 0) {
524         spool_stats.attr_size -= size;
525      } else {
526         spool_stats.attr_size = 0;
527      }
528    }
529    V(mutex);
530 }
531
532 bool commit_attribute_spool(JCR *jcr)
533 {
534    off_t size;
535    char ec1[30];
536
537    if (are_attributes_spooled(jcr)) {
538       if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
539          berrno be;
540          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
541               be.strerror());
542          goto bail_out;
543       }
544       size = ftello(jcr->dir_bsock->spool_fd);
545       if (size < 0) {
546          berrno be;
547          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
548               be.strerror());
549          goto bail_out;
550       }
551       P(mutex);
552       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
553          spool_stats.max_attr_size = spool_stats.attr_size + size;
554       }
555       spool_stats.attr_size += size;
556       V(mutex);
557       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
558             edit_uint64_with_commas(size, ec1));
559       bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
560       return close_attr_spool_file(jcr, jcr->dir_bsock);
561    }
562    return true;
563
564 bail_out:
565    close_attr_spool_file(jcr, jcr->dir_bsock);
566    return false;
567 }
568
569 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
570 {
571    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
572       jcr->Job, fd);
573 }
574
575
576 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
577 {
578    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
579
580    make_unique_spool_filename(jcr, &name, bs->fd);
581    bs->spool_fd = fopen(name, "w+");
582    if (!bs->spool_fd) {
583       berrno be;
584       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
585            be.strerror());
586       free_pool_memory(name);
587       return false;
588    }
589    P(mutex);
590    spool_stats.attr_jobs++;
591    V(mutex);
592    free_pool_memory(name);
593    return true;
594 }
595
596 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
597 {
598    POOLMEM *name;
599
600    if (!bs->spool_fd) {
601       return true;
602    }
603    name = get_pool_memory(PM_MESSAGE);
604    P(mutex);
605    spool_stats.attr_jobs--;
606    spool_stats.total_attr_jobs++;
607    V(mutex);
608    make_unique_spool_filename(jcr, &name, bs->fd);
609    fclose(bs->spool_fd);
610    unlink(name);
611    free_pool_memory(name);
612    bs->spool_fd = NULL;
613    bs->spool = false;
614    return true;
615 }