]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
19July06
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2004-2006 Kern Sibbald
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License
13    version 2 as amended with additional clauses defined in the
14    file LICENSE in the main source directory.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
19    the file LICENSE for additional details.
20
21  */
22
23 #include "bacula.h"
24 #include "stored.h"
25
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int  read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
36
37 struct spool_stats_t {
38    uint32_t data_jobs;                /* current jobs spooling data */
39    uint32_t attr_jobs;
40    uint32_t total_data_jobs;          /* total jobs to have spooled data */
41    uint32_t total_attr_jobs;
42    int64_t max_data_size;             /* max data size */
43    int64_t max_attr_size;
44    int64_t data_size;                 /* current data size (all jobs running) */
45    int64_t attr_size;
46 };
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
50
51 /*
52  * Header for data spool record */
53 struct spool_hdr {
54    int32_t  FirstIndex;               /* FirstIndex for buffer */
55    int32_t  LastIndex;                /* LastIndex for buffer */
56    uint32_t len;                      /* length of next buffer */
57 };
58
59 enum {
60    RB_EOT = 1,
61    RB_ERROR,
62    RB_OK
63 };
64
65 void list_spool_stats(BSOCK *bs)
66 {
67    char ed1[30], ed2[30];
68    if (spool_stats.data_jobs || spool_stats.max_data_size) {
69       bnet_fsend(bs, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
70          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71          spool_stats.total_data_jobs,
72          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
73    }
74    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75       bnet_fsend(bs, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
76          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77          spool_stats.total_attr_jobs,
78          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
79    }
80 }
81
82 bool begin_data_spool(DCR *dcr)
83 {
84    bool stat = true;
85    if (dcr->jcr->spool_data) {
86       Dmsg0(100, "Turning on data spooling\n");
87       dcr->spool_data = true;
88       stat = open_data_spool_file(dcr);
89       if (stat) {
90          dcr->spooling = true;
91          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
92          P(mutex);
93          spool_stats.data_jobs++;
94          V(mutex);
95       }
96    }
97    return stat;
98 }
99
100 bool discard_data_spool(DCR *dcr)
101 {
102    if (dcr->spooling) {
103       Dmsg0(100, "Data spooling discarded\n");
104       return close_data_spool_file(dcr);
105    }
106    return true;
107 }
108
109 bool commit_data_spool(DCR *dcr)
110 {
111    bool stat;
112
113    if (dcr->spooling) {
114       Dmsg0(100, "Committing spooled data\n");
115       stat = despool_data(dcr, true /*commit*/);
116       if (!stat) {
117          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
118          close_data_spool_file(dcr);
119          return false;
120       }
121       return close_data_spool_file(dcr);
122    }
123    return true;
124 }
125
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
127 {
128    const char *dir;
129    if (dcr->dev->device->spool_directory) {
130       dir = dcr->dev->device->spool_directory;
131    } else {
132       dir = working_directory;
133    }
134    Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job, 
135         dcr->device->hdr.name);
136 }
137
138
139 static bool open_data_spool_file(DCR *dcr)
140 {
141    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
142    int spool_fd;
143
144    make_unique_data_spool_filename(dcr, &name);
145    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146       dcr->spool_fd = spool_fd;
147       dcr->jcr->spool_attributes = true;
148    } else {
149       berrno be;
150       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
151            be.strerror());
152       free_pool_memory(name);
153       return false;
154    }
155    Dmsg1(100, "Created spool file: %s\n", name);
156    free_pool_memory(name);
157    return true;
158 }
159
160 static bool close_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163
164    P(mutex);
165    spool_stats.data_jobs--;
166    spool_stats.total_data_jobs++;
167    if (spool_stats.data_size < dcr->job_spool_size) {
168       spool_stats.data_size = 0;
169    } else {
170       spool_stats.data_size -= dcr->job_spool_size;
171    }
172    dcr->job_spool_size = 0;
173    V(mutex);
174
175    make_unique_data_spool_filename(dcr, &name);
176    close(dcr->spool_fd);
177    dcr->spool_fd = -1;
178    dcr->spooling = false;
179    unlink(name);
180    Dmsg1(100, "Deleted spool file: %s\n", name);
181    free_pool_memory(name);
182    return true;
183 }
184
185 static const char *spool_name = "*spool*";
186
187 static bool despool_data(DCR *dcr, bool commit)
188 {
189    DEVICE *rdev;
190    DCR *rdcr;
191    bool ok = true;
192    DEV_BLOCK *block;
193    JCR *jcr = dcr->jcr;
194    int stat;
195    char ec1[50];
196
197    Dmsg0(100, "Despooling data\n");
198    /* Commit means that the job is done, so we commit, otherwise, we
199     *  are despooling because of user spool size max or some error  
200     *  (e.g. filesystem full).
201     */
202    if (commit) {
203       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
204          jcr->dcr->VolumeName,
205          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
206    } else {
207       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
208          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
209    }
210    dcr->spooling = false;
211    dcr->despooling = true;
212    lock_device(dcr->dev);
213    dcr->dev_locked = true;
214
215    /*
216     * This is really quite kludgy and should be fixed some time.
217     * We create a dev structure to read from the spool file
218     * in rdev and rdcr.
219     */
220    rdev = (DEVICE *)malloc(sizeof(DEVICE));
221    memset(rdev, 0, sizeof(DEVICE));
222    rdev->dev_name = get_memory(strlen(spool_name)+1);
223    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
224    rdev->errmsg = get_pool_memory(PM_EMSG);
225    *rdev->errmsg = 0;
226    rdev->max_block_size = dcr->dev->max_block_size;
227    rdev->min_block_size = dcr->dev->min_block_size;
228    rdev->device = dcr->dev->device;
229    rdcr = new_dcr(NULL, rdev);
230    rdcr->spool_fd = dcr->spool_fd;
231    rdcr->jcr = jcr;                   /* set a valid jcr */
232    block = dcr->block;                /* save block */
233    dcr->block = rdcr->block;          /* make read and write block the same */
234
235    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
236    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
237
238    for ( ; ok; ) {
239       if (job_canceled(jcr)) {
240          ok = false;
241          break;
242       }
243       stat = read_block_from_spool_file(rdcr);
244       if (stat == RB_EOT) {
245          break;
246       } else if (stat == RB_ERROR) {
247          ok = false;
248          break;
249       }
250       ok = write_block_to_device(dcr);
251       if (!ok) {
252          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
253                dcr->dev->print_name(), dcr->dev->bstrerror());
254       }
255       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
256    }
257    dcr->block = block;                /* reset block */
258
259    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
260    if (ftruncate(rdcr->spool_fd, 0) != 0) {
261       berrno be;
262       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
263          be.strerror());
264       Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
265       ok = false;
266    }
267
268    P(mutex);
269    if (spool_stats.data_size < dcr->job_spool_size) {
270       spool_stats.data_size = 0;
271    } else {
272       spool_stats.data_size -= dcr->job_spool_size;
273    }
274    V(mutex);
275    P(dcr->dev->spool_mutex);
276    dcr->dev->spool_size -= dcr->job_spool_size;
277    dcr->job_spool_size = 0;            /* zap size in input dcr */
278    V(dcr->dev->spool_mutex);
279    free_memory(rdev->dev_name);
280    free_pool_memory(rdev->errmsg);
281    /* Be careful to NULL the jcr and free rdev after free_dcr() */
282    rdcr->jcr = NULL;
283    free_dcr(rdcr);
284    free(rdev);
285    unlock_device(dcr->dev);
286    dcr->dev_locked = false;
287    dcr->spooling = true;           /* turn on spooling again */
288    dcr->despooling = false;
289    return ok;
290 }
291
292 /*
293  * Read a block from the spool file
294  *
295  *  Returns RB_OK on success
296  *          RB_EOT when file done
297  *          RB_ERROR on error
298  */
299 static int read_block_from_spool_file(DCR *dcr)
300 {
301    uint32_t rlen;
302    ssize_t stat;
303    spool_hdr hdr;
304    DEV_BLOCK *block = dcr->block;
305
306    rlen = sizeof(hdr);
307    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
308    if (stat == 0) {
309       Dmsg0(100, "EOT on spool read.\n");
310       return RB_EOT;
311    } else if (stat != (ssize_t)rlen) {
312       if (stat == -1) {
313          berrno be;
314          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
315               be.strerror());
316       } else {
317          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
318          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
319       }
320       return RB_ERROR;
321    }
322    rlen = hdr.len;
323    if (rlen > block->buf_len) {
324       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
325       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
326       return RB_ERROR;
327    }
328    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
329    if (stat != (ssize_t)rlen) {
330       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
331       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
332       return RB_ERROR;
333    }
334    /* Setup write pointers */
335    block->binbuf = rlen;
336    block->bufp = block->buf + block->binbuf;
337    block->FirstIndex = hdr.FirstIndex;
338    block->LastIndex = hdr.LastIndex;
339    block->VolSessionId = dcr->jcr->VolSessionId;
340    block->VolSessionTime = dcr->jcr->VolSessionTime;
341    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
342    return RB_OK;
343 }
344
345 /*
346  * Write a block to the spool file
347  *
348  *  Returns: true on success or EOT
349  *           false on hard error
350  */
351 bool write_block_to_spool_file(DCR *dcr)
352 {
353    uint32_t wlen, hlen;               /* length to write */
354    bool despool = false;
355    DEV_BLOCK *block = dcr->block;
356
357    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
358    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
359       return true;
360    }
361
362    hlen = sizeof(spool_hdr);
363    wlen = block->binbuf;
364    P(dcr->dev->spool_mutex);
365    dcr->job_spool_size += hlen + wlen;
366    dcr->dev->spool_size += hlen + wlen;
367    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
368        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
369       despool = true;
370    }
371    V(dcr->dev->spool_mutex);
372    P(mutex);
373    spool_stats.data_size += hlen + wlen;
374    if (spool_stats.data_size > spool_stats.max_data_size) {
375       spool_stats.max_data_size = spool_stats.data_size;
376    }
377    V(mutex);
378    if (despool) {
379 #ifdef xDEBUG
380       char ec1[30], ec2[30], ec3[30], ec4[30];
381       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
382             "max_job_size=%s job_size=%s\n",
383             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
384             edit_uint64_with_commas(dcr->job_spool_size, ec2),
385             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
386             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
387 #endif
388       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
389       if (!despool_data(dcr, false)) {
390          Pmsg0(000, _("Bad return from despool in write_block.\n"));
391          return false;
392       }
393       /* Despooling cleared these variables so reset them */
394       P(dcr->dev->spool_mutex);
395       dcr->job_spool_size += hlen + wlen;
396       dcr->dev->spool_size += hlen + wlen;
397       V(dcr->dev->spool_mutex);
398       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
399    }
400
401
402    if (!write_spool_header(dcr)) {
403       return false;
404    }
405    if (!write_spool_data(dcr)) {
406      return false;
407    }
408
409    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
410    empty_block(block);
411    return true;
412 }
413
414 static bool write_spool_header(DCR *dcr)
415 {
416    spool_hdr hdr;
417    ssize_t stat;
418    DEV_BLOCK *block = dcr->block;
419
420    hdr.FirstIndex = block->FirstIndex;
421    hdr.LastIndex = block->LastIndex;
422    hdr.len = block->binbuf;
423
424    /* Write header */
425    for (int retry=0; retry<=1; retry++) {
426       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
427       if (stat == -1) {
428          berrno be;
429          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
430               be.strerror());
431       }
432       if (stat != (ssize_t)sizeof(hdr)) {
433          /* If we wrote something, truncate it, then despool */
434          if (stat != -1) {
435             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
436                berrno be;
437                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
438                   be.strerror());
439                return false;
440             }
441          }
442          if (!despool_data(dcr, false)) {
443             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
444             return false;
445          }
446          continue;                    /* try again */
447       }
448       return true;
449    }
450    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
451    return false;
452 }
453
454 static bool write_spool_data(DCR *dcr)
455 {
456    ssize_t stat;
457    DEV_BLOCK *block = dcr->block;
458
459    /* Write data */
460    for (int retry=0; retry<=1; retry++) {
461       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
462       if (stat == -1) {
463          berrno be;
464          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
465               be.strerror());
466       }
467       if (stat != (ssize_t)block->binbuf) {
468          /*
469           * If we wrote something, truncate it and the header, then despool
470           */
471          if (stat != -1) {
472             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
473                       - stat - sizeof(spool_hdr)) != 0) {
474                berrno be;
475                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
476                   be.strerror());
477                return false;
478             }
479          }
480          if (!despool_data(dcr, false)) {
481             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
482             return false;
483          }
484          if (!write_spool_header(dcr)) {
485             return false;
486          }
487          continue;                    /* try again */
488       }
489       return true;
490    }
491    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
492    return false;
493 }
494
495
496
497 bool are_attributes_spooled(JCR *jcr)
498 {
499    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
500 }
501
502 /*
503  * Create spool file for attributes.
504  *  This is done by "attaching" to the bsock, and when
505  *  it is called, the output is written to a file.
506  *  The actual spooling is turned on and off in
507  *  append.c only during writing of the attributes.
508  */
509 bool begin_attribute_spool(JCR *jcr)
510 {
511    if (!jcr->no_attributes && jcr->spool_attributes) {
512       return open_attr_spool_file(jcr, jcr->dir_bsock);
513    }
514    return true;
515 }
516
517 bool discard_attribute_spool(JCR *jcr)
518 {
519    if (are_attributes_spooled(jcr)) {
520       return close_attr_spool_file(jcr, jcr->dir_bsock);
521    }
522    return true;
523 }
524
525 static void update_attr_spool_size(ssize_t size)
526 {
527    P(mutex);
528    if (size > 0) {
529      if ((spool_stats.attr_size - size) > 0) {
530         spool_stats.attr_size -= size;
531      } else {
532         spool_stats.attr_size = 0;
533      }
534    }
535    V(mutex);
536 }
537
538 bool commit_attribute_spool(JCR *jcr)
539 {
540    off_t size;
541    char ec1[30];
542
543    if (are_attributes_spooled(jcr)) {
544       if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
545          berrno be;
546          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
547               be.strerror());
548          goto bail_out;
549       }
550       size = ftello(jcr->dir_bsock->spool_fd);
551       if (size < 0) {
552          berrno be;
553          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
554               be.strerror());
555          goto bail_out;
556       }
557       P(mutex);
558       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
559          spool_stats.max_attr_size = spool_stats.attr_size + size;
560       }
561       spool_stats.attr_size += size;
562       V(mutex);
563       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
564             edit_uint64_with_commas(size, ec1));
565       bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
566       return close_attr_spool_file(jcr, jcr->dir_bsock);
567    }
568    return true;
569
570 bail_out:
571    close_attr_spool_file(jcr, jcr->dir_bsock);
572    return false;
573 }
574
575 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
576 {
577    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
578       jcr->Job, fd);
579 }
580
581
582 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
583 {
584    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
585
586    make_unique_spool_filename(jcr, &name, bs->fd);
587    bs->spool_fd = fopen(name, "w+b");
588    if (!bs->spool_fd) {
589       berrno be;
590       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
591            be.strerror());
592       free_pool_memory(name);
593       return false;
594    }
595    P(mutex);
596    spool_stats.attr_jobs++;
597    V(mutex);
598    free_pool_memory(name);
599    return true;
600 }
601
602 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
603 {
604    POOLMEM *name;
605
606    if (!bs->spool_fd) {
607       return true;
608    }
609    name = get_pool_memory(PM_MESSAGE);
610    P(mutex);
611    spool_stats.attr_jobs--;
612    spool_stats.total_attr_jobs++;
613    V(mutex);
614    make_unique_spool_filename(jcr, &name, bs->fd);
615    fclose(bs->spool_fd);
616    unlink(name);
617    free_pool_memory(name);
618    bs->spool_fd = NULL;
619    bs->spool = false;
620    return true;
621 }