]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2004-2005 Kern Sibbald
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License
13    version 2 as amended with additional clauses defined in the
14    file LICENSE in the main source directory.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
19    the file LICENSE for additional details.
20
21  */
22
23 #include "bacula.h"
24 #include "stored.h"
25
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int  read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
36
37 struct spool_stats_t {
38    uint32_t data_jobs;                /* current jobs spooling data */
39    uint32_t attr_jobs;
40    uint32_t total_data_jobs;          /* total jobs to have spooled data */
41    uint32_t total_attr_jobs;
42    int64_t max_data_size;             /* max data size */
43    int64_t max_attr_size;
44    int64_t data_size;                 /* current data size (all jobs running) */
45    int64_t attr_size;
46 };
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
50
51 /*
52  * Header for data spool record */
53 struct spool_hdr {
54    int32_t  FirstIndex;               /* FirstIndex for buffer */
55    int32_t  LastIndex;                /* LastIndex for buffer */
56    uint32_t len;                      /* length of next buffer */
57 };
58
59 enum {
60    RB_EOT = 1,
61    RB_ERROR,
62    RB_OK
63 };
64
65 void list_spool_stats(BSOCK *bs)
66 {
67    char ed1[30], ed2[30];
68    if (spool_stats.data_jobs || spool_stats.max_data_size) {
69       bnet_fsend(bs, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
70          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71          spool_stats.total_data_jobs,
72          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
73    }
74    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75       bnet_fsend(bs, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
76          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77          spool_stats.total_attr_jobs,
78          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
79    }
80 }
81
82 bool begin_data_spool(DCR *dcr)
83 {
84    bool stat = true;
85    if (dcr->jcr->spool_data) {
86       Dmsg0(100, "Turning on data spooling\n");
87       dcr->spool_data = true;
88       stat = open_data_spool_file(dcr);
89       if (stat) {
90          dcr->spooling = true;
91          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
92          P(mutex);
93          spool_stats.data_jobs++;
94          V(mutex);
95       }
96    }
97    return stat;
98 }
99
100 bool discard_data_spool(DCR *dcr)
101 {
102    if (dcr->spooling) {
103       Dmsg0(100, "Data spooling discarded\n");
104       return close_data_spool_file(dcr);
105    }
106    return true;
107 }
108
109 bool commit_data_spool(DCR *dcr)
110 {
111    bool stat;
112
113    if (dcr->spooling) {
114       Dmsg0(100, "Committing spooled data\n");
115       stat = despool_data(dcr, true /*commit*/);
116       if (!stat) {
117          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
118          close_data_spool_file(dcr);
119          return false;
120       }
121       return close_data_spool_file(dcr);
122    }
123    return true;
124 }
125
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
127 {
128    const char *dir;
129    if (dcr->dev->device->spool_directory) {
130       dir = dcr->dev->device->spool_directory;
131    } else {
132       dir = working_directory;
133    }
134    Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job, 
135         dcr->device->hdr.name);
136 }
137
138
139 static bool open_data_spool_file(DCR *dcr)
140 {
141    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
142    int spool_fd;
143
144    make_unique_data_spool_filename(dcr, &name);
145    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146       dcr->spool_fd = spool_fd;
147       dcr->jcr->spool_attributes = true;
148    } else {
149       berrno be;
150       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
151            be.strerror());
152       free_pool_memory(name);
153       return false;
154    }
155    Dmsg1(100, "Created spool file: %s\n", name);
156    free_pool_memory(name);
157    return true;
158 }
159
160 static bool close_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163
164    P(mutex);
165    spool_stats.data_jobs--;
166    spool_stats.total_data_jobs++;
167    if (spool_stats.data_size < dcr->job_spool_size) {
168       spool_stats.data_size = 0;
169    } else {
170       spool_stats.data_size -= dcr->job_spool_size;
171    }
172    dcr->job_spool_size = 0;
173    V(mutex);
174
175    make_unique_data_spool_filename(dcr, &name);
176    close(dcr->spool_fd);
177    dcr->spool_fd = -1;
178    dcr->spooling = false;
179    unlink(name);
180    Dmsg1(100, "Deleted spool file: %s\n", name);
181    free_pool_memory(name);
182    return true;
183 }
184
185 static const char *spool_name = "*spool*";
186
187 static bool despool_data(DCR *dcr, bool commit)
188 {
189    DEVICE *rdev;
190    DCR *rdcr;
191    bool ok = true;
192    DEV_BLOCK *block;
193    JCR *jcr = dcr->jcr;
194    int stat;
195    char ec1[50];
196
197    Dmsg0(100, "Despooling data\n");
198    if (commit) {
199       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
200          jcr->dcr->VolumeName,
201          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
202    }
203    else {
204       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
205          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
206    }
207    dcr->spooling = false;
208    lock_device(dcr->dev);
209    dcr->dev_locked = true;
210
211    /*
212     * This is really quite kludgy and should be fixed some time.
213     * We create a dev structure to read from the spool file
214     * in rdev and rdcr.
215     */
216    rdev = (DEVICE *)malloc(sizeof(DEVICE));
217    memset(rdev, 0, sizeof(DEVICE));
218    rdev->dev_name = get_memory(strlen(spool_name)+1);
219    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
220    rdev->errmsg = get_pool_memory(PM_EMSG);
221    *rdev->errmsg = 0;
222    rdev->max_block_size = dcr->dev->max_block_size;
223    rdev->min_block_size = dcr->dev->min_block_size;
224    rdev->device = dcr->dev->device;
225    rdcr = new_dcr(NULL, rdev);
226    rdcr->spool_fd = dcr->spool_fd;
227    rdcr->jcr = jcr;                   /* set a valid jcr */
228    block = dcr->block;                /* save block */
229    dcr->block = rdcr->block;          /* make read and write block the same */
230
231    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
232    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
233
234    for ( ; ok; ) {
235       if (job_canceled(jcr)) {
236          ok = false;
237          break;
238       }
239       stat = read_block_from_spool_file(rdcr);
240       if (stat == RB_EOT) {
241          break;
242       } else if (stat == RB_ERROR) {
243          ok = false;
244          break;
245       }
246       ok = write_block_to_device(dcr);
247       if (!ok) {
248          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
249                dcr->dev->print_name(), strerror_dev(dcr->dev));
250       }
251       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
252    }
253    dcr->block = block;                /* reset block */
254
255    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
256    if (ftruncate(rdcr->spool_fd, 0) != 0) {
257       berrno be;
258       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
259          be.strerror());
260       Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
261       ok = false;
262    }
263
264    P(mutex);
265    if (spool_stats.data_size < dcr->job_spool_size) {
266       spool_stats.data_size = 0;
267    } else {
268       spool_stats.data_size -= dcr->job_spool_size;
269    }
270    V(mutex);
271    P(dcr->dev->spool_mutex);
272    dcr->dev->spool_size -= dcr->job_spool_size;
273    dcr->job_spool_size = 0;            /* zap size in input dcr */
274    V(dcr->dev->spool_mutex);
275    free_memory(rdev->dev_name);
276    free_pool_memory(rdev->errmsg);
277    /* Be careful to NULL the jcr and free rdev after free_dcr() */
278    rdcr->jcr = NULL;
279    free_dcr(rdcr);
280    free(rdev);
281    unlock_device(dcr->dev);
282    dcr->dev_locked = false;
283    dcr->spooling = true;           /* turn on spooling again */
284    return ok;
285 }
286
287 /*
288  * Read a block from the spool file
289  *
290  *  Returns RB_OK on success
291  *          RB_EOT when file done
292  *          RB_ERROR on error
293  */
294 static int read_block_from_spool_file(DCR *dcr)
295 {
296    uint32_t rlen;
297    ssize_t stat;
298    spool_hdr hdr;
299    DEV_BLOCK *block = dcr->block;
300
301    rlen = sizeof(hdr);
302    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
303    if (stat == 0) {
304       Dmsg0(100, "EOT on spool read.\n");
305       return RB_EOT;
306    } else if (stat != (ssize_t)rlen) {
307       if (stat == -1) {
308          berrno be;
309          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
310               be.strerror());
311       } else {
312          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
313          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
314       }
315       return RB_ERROR;
316    }
317    rlen = hdr.len;
318    if (rlen > block->buf_len) {
319       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
320       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
321       return RB_ERROR;
322    }
323    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
324    if (stat != (ssize_t)rlen) {
325       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
326       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
327       return RB_ERROR;
328    }
329    /* Setup write pointers */
330    block->binbuf = rlen;
331    block->bufp = block->buf + block->binbuf;
332    block->FirstIndex = hdr.FirstIndex;
333    block->LastIndex = hdr.LastIndex;
334    block->VolSessionId = dcr->jcr->VolSessionId;
335    block->VolSessionTime = dcr->jcr->VolSessionTime;
336    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
337    return RB_OK;
338 }
339
340 /*
341  * Write a block to the spool file
342  *
343  *  Returns: true on success or EOT
344  *           false on hard error
345  */
346 bool write_block_to_spool_file(DCR *dcr)
347 {
348    uint32_t wlen, hlen;               /* length to write */
349    bool despool = false;
350    DEV_BLOCK *block = dcr->block;
351
352    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
353    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
354       return true;
355    }
356
357    hlen = sizeof(spool_hdr);
358    wlen = block->binbuf;
359    P(dcr->dev->spool_mutex);
360    dcr->job_spool_size += hlen + wlen;
361    dcr->dev->spool_size += hlen + wlen;
362    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
363        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
364       despool = true;
365    }
366    V(dcr->dev->spool_mutex);
367    P(mutex);
368    spool_stats.data_size += hlen + wlen;
369    if (spool_stats.data_size > spool_stats.max_data_size) {
370       spool_stats.max_data_size = spool_stats.data_size;
371    }
372    V(mutex);
373    if (despool) {
374 #ifdef xDEBUG
375       char ec1[30], ec2[30], ec3[30], ec4[30];
376       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
377             "max_job_size=%s job_size=%s\n",
378             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
379             edit_uint64_with_commas(dcr->job_spool_size, ec2),
380             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
381             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
382 #endif
383       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
384       if (!despool_data(dcr, false)) {
385          Pmsg0(000, _("Bad return from despool in write_block.\n"));
386          return false;
387       }
388       /* Despooling cleared these variables so reset them */
389       P(dcr->dev->spool_mutex);
390       dcr->job_spool_size += hlen + wlen;
391       dcr->dev->spool_size += hlen + wlen;
392       V(dcr->dev->spool_mutex);
393       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
394    }
395
396
397    if (!write_spool_header(dcr)) {
398       return false;
399    }
400    if (!write_spool_data(dcr)) {
401      return false;
402    }
403
404    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
405    empty_block(block);
406    return true;
407 }
408
409 static bool write_spool_header(DCR *dcr)
410 {
411    spool_hdr hdr;
412    ssize_t stat;
413    DEV_BLOCK *block = dcr->block;
414
415    hdr.FirstIndex = block->FirstIndex;
416    hdr.LastIndex = block->LastIndex;
417    hdr.len = block->binbuf;
418
419    /* Write header */
420    for (int retry=0; retry<=1; retry++) {
421       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
422       if (stat == -1) {
423          berrno be;
424          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
425               be.strerror());
426       }
427       if (stat != (ssize_t)sizeof(hdr)) {
428          /* If we wrote something, truncate it, then despool */
429          if (stat != -1) {
430             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
431                berrno be;
432                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
433                   be.strerror());
434                return false;
435             }
436          }
437          if (!despool_data(dcr, false)) {
438             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
439             return false;
440          }
441          continue;                    /* try again */
442       }
443       return true;
444    }
445    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
446    return false;
447 }
448
449 static bool write_spool_data(DCR *dcr)
450 {
451    ssize_t stat;
452    DEV_BLOCK *block = dcr->block;
453
454    /* Write data */
455    for (int retry=0; retry<=1; retry++) {
456       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
457       if (stat == -1) {
458          berrno be;
459          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
460               be.strerror());
461       }
462       if (stat != (ssize_t)block->binbuf) {
463          /*
464           * If we wrote something, truncate it and the header, then despool
465           */
466          if (stat != -1) {
467             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
468                       - stat - sizeof(spool_hdr)) != 0) {
469                berrno be;
470                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
471                   be.strerror());
472                return false;
473             }
474          }
475          if (!despool_data(dcr, false)) {
476             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
477             return false;
478          }
479          if (!write_spool_header(dcr)) {
480             return false;
481          }
482          continue;                    /* try again */
483       }
484       return true;
485    }
486    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
487    return false;
488 }
489
490
491
492 bool are_attributes_spooled(JCR *jcr)
493 {
494    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
495 }
496
497 /*
498  * Create spool file for attributes.
499  *  This is done by "attaching" to the bsock, and when
500  *  it is called, the output is written to a file.
501  *  The actual spooling is turned on and off in
502  *  append.c only during writing of the attributes.
503  */
504 bool begin_attribute_spool(JCR *jcr)
505 {
506    if (!jcr->no_attributes && jcr->spool_attributes) {
507       return open_attr_spool_file(jcr, jcr->dir_bsock);
508    }
509    return true;
510 }
511
512 bool discard_attribute_spool(JCR *jcr)
513 {
514    if (are_attributes_spooled(jcr)) {
515       return close_attr_spool_file(jcr, jcr->dir_bsock);
516    }
517    return true;
518 }
519
520 static void update_attr_spool_size(ssize_t size)
521 {
522    P(mutex);
523    if (size > 0) {
524      if ((spool_stats.attr_size - size) > 0) {
525         spool_stats.attr_size -= size;
526      } else {
527         spool_stats.attr_size = 0;
528      }
529    }
530    V(mutex);
531 }
532
533 bool commit_attribute_spool(JCR *jcr)
534 {
535    off_t size;
536    char ec1[30];
537
538    if (are_attributes_spooled(jcr)) {
539       if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
540          berrno be;
541          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
542               be.strerror());
543          goto bail_out;
544       }
545       size = ftello(jcr->dir_bsock->spool_fd);
546       if (size < 0) {
547          berrno be;
548          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
549               be.strerror());
550          goto bail_out;
551       }
552       P(mutex);
553       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
554          spool_stats.max_attr_size = spool_stats.attr_size + size;
555       }
556       spool_stats.attr_size += size;
557       V(mutex);
558       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
559             edit_uint64_with_commas(size, ec1));
560       bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
561       return close_attr_spool_file(jcr, jcr->dir_bsock);
562    }
563    return true;
564
565 bail_out:
566    close_attr_spool_file(jcr, jcr->dir_bsock);
567    return false;
568 }
569
570 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
571 {
572    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
573       jcr->Job, fd);
574 }
575
576
577 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
578 {
579    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
580
581    make_unique_spool_filename(jcr, &name, bs->fd);
582    bs->spool_fd = fopen(name, "w+");
583    if (!bs->spool_fd) {
584       berrno be;
585       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
586            be.strerror());
587       free_pool_memory(name);
588       return false;
589    }
590    P(mutex);
591    spool_stats.attr_jobs++;
592    V(mutex);
593    free_pool_memory(name);
594    return true;
595 }
596
597 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
598 {
599    POOLMEM *name;
600
601    if (!bs->spool_fd) {
602       return true;
603    }
604    name = get_pool_memory(PM_MESSAGE);
605    P(mutex);
606    spool_stats.attr_jobs--;
607    spool_stats.total_attr_jobs++;
608    V(mutex);
609    make_unique_spool_filename(jcr, &name, bs->fd);
610    fclose(bs->spool_fd);
611    unlink(name);
612    free_pool_memory(name);
613    bs->spool_fd = NULL;
614    bs->spool = false;
615    return true;
616 }