]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
Strip pathname portion off all message routines that print filename:line.
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2004-2006 Kern Sibbald
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License
13    version 2 as amended with additional clauses defined in the
14    file LICENSE in the main source directory.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
19    the file LICENSE for additional details.
20
21  */
22
23 #include "bacula.h"
24 #include "stored.h"
25
26 /* Forward referenced subroutines */
27 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name);
28 static bool open_data_spool_file(DCR *dcr);
29 static bool close_data_spool_file(DCR *dcr);
30 static bool despool_data(DCR *dcr, bool commit);
31 static int  read_block_from_spool_file(DCR *dcr);
32 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
33 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
34 static bool write_spool_header(DCR *dcr);
35 static bool write_spool_data(DCR *dcr);
36
37 struct spool_stats_t {
38    uint32_t data_jobs;                /* current jobs spooling data */
39    uint32_t attr_jobs;
40    uint32_t total_data_jobs;          /* total jobs to have spooled data */
41    uint32_t total_attr_jobs;
42    int64_t max_data_size;             /* max data size */
43    int64_t max_attr_size;
44    int64_t data_size;                 /* current data size (all jobs running) */
45    int64_t attr_size;
46 };
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49 spool_stats_t spool_stats;
50
51 /*
52  * Header for data spool record */
53 struct spool_hdr {
54    int32_t  FirstIndex;               /* FirstIndex for buffer */
55    int32_t  LastIndex;                /* LastIndex for buffer */
56    uint32_t len;                      /* length of next buffer */
57 };
58
59 enum {
60    RB_EOT = 1,
61    RB_ERROR,
62    RB_OK
63 };
64
65 void list_spool_stats(BSOCK *bs)
66 {
67    char ed1[30], ed2[30];
68    if (spool_stats.data_jobs || spool_stats.max_data_size) {
69       bnet_fsend(bs, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
70          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71          spool_stats.total_data_jobs,
72          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
73    }
74    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
75       bnet_fsend(bs, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
76          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
77          spool_stats.total_attr_jobs,
78          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
79    }
80 }
81
82 bool begin_data_spool(DCR *dcr)
83 {
84    bool stat = true;
85    if (dcr->jcr->spool_data) {
86       Dmsg0(100, "Turning on data spooling\n");
87       dcr->spool_data = true;
88       stat = open_data_spool_file(dcr);
89       if (stat) {
90          dcr->spooling = true;
91          Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data ...\n"));
92          P(mutex);
93          spool_stats.data_jobs++;
94          V(mutex);
95       }
96    }
97    return stat;
98 }
99
100 bool discard_data_spool(DCR *dcr)
101 {
102    if (dcr->spooling) {
103       Dmsg0(100, "Data spooling discarded\n");
104       return close_data_spool_file(dcr);
105    }
106    return true;
107 }
108
109 bool commit_data_spool(DCR *dcr)
110 {
111    bool stat;
112
113    if (dcr->spooling) {
114       Dmsg0(100, "Committing spooled data\n");
115       stat = despool_data(dcr, true /*commit*/);
116       if (!stat) {
117          Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
118          close_data_spool_file(dcr);
119          return false;
120       }
121       return close_data_spool_file(dcr);
122    }
123    return true;
124 }
125
126 static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
127 {
128    const char *dir;
129    if (dcr->dev->device->spool_directory) {
130       dir = dcr->dev->device->spool_directory;
131    } else {
132       dir = working_directory;
133    }
134    Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job, 
135         dcr->device->hdr.name);
136 }
137
138
139 static bool open_data_spool_file(DCR *dcr)
140 {
141    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
142    int spool_fd;
143
144    make_unique_data_spool_filename(dcr, &name);
145    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
146       dcr->spool_fd = spool_fd;
147       dcr->jcr->spool_attributes = true;
148    } else {
149       berrno be;
150       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
151            be.strerror());
152       free_pool_memory(name);
153       return false;
154    }
155    Dmsg1(100, "Created spool file: %s\n", name);
156    free_pool_memory(name);
157    return true;
158 }
159
160 static bool close_data_spool_file(DCR *dcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163
164    P(mutex);
165    spool_stats.data_jobs--;
166    spool_stats.total_data_jobs++;
167    if (spool_stats.data_size < dcr->job_spool_size) {
168       spool_stats.data_size = 0;
169    } else {
170       spool_stats.data_size -= dcr->job_spool_size;
171    }
172    dcr->job_spool_size = 0;
173    V(mutex);
174
175    make_unique_data_spool_filename(dcr, &name);
176    close(dcr->spool_fd);
177    dcr->spool_fd = -1;
178    dcr->spooling = false;
179    unlink(name);
180    Dmsg1(100, "Deleted spool file: %s\n", name);
181    free_pool_memory(name);
182    return true;
183 }
184
185 static const char *spool_name = "*spool*";
186
187 static bool despool_data(DCR *dcr, bool commit)
188 {
189    DEVICE *rdev;
190    DCR *rdcr;
191    bool ok = true;
192    DEV_BLOCK *block;
193    JCR *jcr = dcr->jcr;
194    int stat;
195    char ec1[50];
196
197    Dmsg0(100, "Despooling data\n");
198    /* Commit means that the job is done, so we commit, otherwise, we
199     *  are despooling because of user spool size max or some error  
200     *  (e.g. filesystem full).
201     */
202    if (commit) {
203       Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
204          jcr->dcr->VolumeName,
205          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
206    } else {
207       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
208          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
209    }
210    dcr->despool_wait = true;
211    dcr->spooling = false;
212    lock_device(dcr->dev);
213    dcr->despool_wait = false;
214    dcr->despooling = true;
215    dcr->dev_locked = true;
216
217    /*
218     * This is really quite kludgy and should be fixed some time.
219     * We create a dev structure to read from the spool file
220     * in rdev and rdcr.
221     */
222    rdev = (DEVICE *)malloc(sizeof(DEVICE));
223    memset(rdev, 0, sizeof(DEVICE));
224    rdev->dev_name = get_memory(strlen(spool_name)+1);
225    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
226    rdev->errmsg = get_pool_memory(PM_EMSG);
227    *rdev->errmsg = 0;
228    rdev->max_block_size = dcr->dev->max_block_size;
229    rdev->min_block_size = dcr->dev->min_block_size;
230    rdev->device = dcr->dev->device;
231    rdcr = new_dcr(NULL, rdev);
232    rdcr->spool_fd = dcr->spool_fd;
233    rdcr->jcr = jcr;                   /* set a valid jcr */
234    block = dcr->block;                /* save block */
235    dcr->block = rdcr->block;          /* make read and write block the same */
236
237    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
238    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
239
240    time_t despool_start = time(NULL);
241
242    for ( ; ok; ) {
243       if (job_canceled(jcr)) {
244          ok = false;
245          break;
246       }
247       stat = read_block_from_spool_file(rdcr);
248       if (stat == RB_EOT) {
249          break;
250       } else if (stat == RB_ERROR) {
251          ok = false;
252          break;
253       }
254       ok = write_block_to_device(dcr);
255       if (!ok) {
256          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
257                dcr->dev->print_name(), dcr->dev->bstrerror());
258       }
259       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
260    }
261
262    time_t despool_elapsed = time(NULL) - despool_start;
263
264    if (despool_elapsed == 0)
265       despool_elapsed = 1;
266
267    Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s bytes/second\n"),
268          despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
269          edit_uint64_with_commas(jcr->dcr->job_spool_size / despool_elapsed, ec1));
270
271    dcr->block = block;                /* reset block */
272
273    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
274    if (ftruncate(rdcr->spool_fd, 0) != 0) {
275       berrno be;
276       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
277          be.strerror());
278       Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
279       ok = false;
280    }
281
282    P(mutex);
283    if (spool_stats.data_size < dcr->job_spool_size) {
284       spool_stats.data_size = 0;
285    } else {
286       spool_stats.data_size -= dcr->job_spool_size;
287    }
288    V(mutex);
289    P(dcr->dev->spool_mutex);
290    dcr->dev->spool_size -= dcr->job_spool_size;
291    dcr->job_spool_size = 0;            /* zap size in input dcr */
292    V(dcr->dev->spool_mutex);
293    free_memory(rdev->dev_name);
294    free_pool_memory(rdev->errmsg);
295    /* Be careful to NULL the jcr and free rdev after free_dcr() */
296    rdcr->jcr = NULL;
297    free_dcr(rdcr);
298    free(rdev);
299    dcr->dev_locked = false;
300    dcr->spooling = true;           /* turn on spooling again */
301    dcr->despooling = false;
302    unlock_device(dcr->dev);
303    return ok;
304 }
305
306 /*
307  * Read a block from the spool file
308  *
309  *  Returns RB_OK on success
310  *          RB_EOT when file done
311  *          RB_ERROR on error
312  */
313 static int read_block_from_spool_file(DCR *dcr)
314 {
315    uint32_t rlen;
316    ssize_t stat;
317    spool_hdr hdr;
318    DEV_BLOCK *block = dcr->block;
319
320    rlen = sizeof(hdr);
321    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
322    if (stat == 0) {
323       Dmsg0(100, "EOT on spool read.\n");
324       return RB_EOT;
325    } else if (stat != (ssize_t)rlen) {
326       if (stat == -1) {
327          berrno be;
328          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
329               be.strerror());
330       } else {
331          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
332          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
333       }
334       return RB_ERROR;
335    }
336    rlen = hdr.len;
337    if (rlen > block->buf_len) {
338       Pmsg2(000, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
339       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
340       return RB_ERROR;
341    }
342    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
343    if (stat != (ssize_t)rlen) {
344       Pmsg2(000, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
345       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %d\n"), rlen, stat);
346       return RB_ERROR;
347    }
348    /* Setup write pointers */
349    block->binbuf = rlen;
350    block->bufp = block->buf + block->binbuf;
351    block->FirstIndex = hdr.FirstIndex;
352    block->LastIndex = hdr.LastIndex;
353    block->VolSessionId = dcr->jcr->VolSessionId;
354    block->VolSessionTime = dcr->jcr->VolSessionTime;
355    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
356    return RB_OK;
357 }
358
359 /*
360  * Write a block to the spool file
361  *
362  *  Returns: true on success or EOT
363  *           false on hard error
364  */
365 bool write_block_to_spool_file(DCR *dcr)
366 {
367    uint32_t wlen, hlen;               /* length to write */
368    bool despool = false;
369    DEV_BLOCK *block = dcr->block;
370
371    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
372    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
373       return true;
374    }
375
376    hlen = sizeof(spool_hdr);
377    wlen = block->binbuf;
378    P(dcr->dev->spool_mutex);
379    dcr->job_spool_size += hlen + wlen;
380    dcr->dev->spool_size += hlen + wlen;
381    if ((dcr->max_job_spool_size > 0 && dcr->job_spool_size >= dcr->max_job_spool_size) ||
382        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
383       despool = true;
384    }
385    V(dcr->dev->spool_mutex);
386    P(mutex);
387    spool_stats.data_size += hlen + wlen;
388    if (spool_stats.data_size > spool_stats.max_data_size) {
389       spool_stats.max_data_size = spool_stats.data_size;
390    }
391    V(mutex);
392    if (despool) {
393 #ifdef xDEBUG
394       char ec1[30], ec2[30], ec3[30], ec4[30];
395       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
396             "max_job_size=%s job_size=%s\n",
397             edit_uint64_with_commas(dcr->max_job_spool_size, ec1),
398             edit_uint64_with_commas(dcr->job_spool_size, ec2),
399             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
400             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
401 #endif
402       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
403       if (!despool_data(dcr, false)) {
404          Pmsg0(000, _("Bad return from despool in write_block.\n"));
405          return false;
406       }
407       /* Despooling cleared these variables so reset them */
408       P(dcr->dev->spool_mutex);
409       dcr->job_spool_size += hlen + wlen;
410       dcr->dev->spool_size += hlen + wlen;
411       V(dcr->dev->spool_mutex);
412       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
413    }
414
415
416    if (!write_spool_header(dcr)) {
417       return false;
418    }
419    if (!write_spool_data(dcr)) {
420      return false;
421    }
422
423    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
424    empty_block(block);
425    return true;
426 }
427
428 static bool write_spool_header(DCR *dcr)
429 {
430    spool_hdr hdr;
431    ssize_t stat;
432    DEV_BLOCK *block = dcr->block;
433
434    hdr.FirstIndex = block->FirstIndex;
435    hdr.LastIndex = block->LastIndex;
436    hdr.len = block->binbuf;
437
438    /* Write header */
439    for (int retry=0; retry<=1; retry++) {
440       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
441       if (stat == -1) {
442          berrno be;
443          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
444               be.strerror());
445       }
446       if (stat != (ssize_t)sizeof(hdr)) {
447          /* If we wrote something, truncate it, then despool */
448          if (stat != -1) {
449             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
450                berrno be;
451                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
452                   be.strerror());
453                return false;
454             }
455          }
456          if (!despool_data(dcr, false)) {
457             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
458             return false;
459          }
460          continue;                    /* try again */
461       }
462       return true;
463    }
464    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
465    return false;
466 }
467
468 static bool write_spool_data(DCR *dcr)
469 {
470    ssize_t stat;
471    DEV_BLOCK *block = dcr->block;
472
473    /* Write data */
474    for (int retry=0; retry<=1; retry++) {
475       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
476       if (stat == -1) {
477          berrno be;
478          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
479               be.strerror());
480       }
481       if (stat != (ssize_t)block->binbuf) {
482          /*
483           * If we wrote something, truncate it and the header, then despool
484           */
485          if (stat != -1) {
486             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
487                       - stat - sizeof(spool_hdr)) != 0) {
488                berrno be;
489                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
490                   be.strerror());
491                return false;
492             }
493          }
494          if (!despool_data(dcr, false)) {
495             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
496             return false;
497          }
498          if (!write_spool_header(dcr)) {
499             return false;
500          }
501          continue;                    /* try again */
502       }
503       return true;
504    }
505    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
506    return false;
507 }
508
509
510
511 bool are_attributes_spooled(JCR *jcr)
512 {
513    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
514 }
515
516 /*
517  * Create spool file for attributes.
518  *  This is done by "attaching" to the bsock, and when
519  *  it is called, the output is written to a file.
520  *  The actual spooling is turned on and off in
521  *  append.c only during writing of the attributes.
522  */
523 bool begin_attribute_spool(JCR *jcr)
524 {
525    if (!jcr->no_attributes && jcr->spool_attributes) {
526       return open_attr_spool_file(jcr, jcr->dir_bsock);
527    }
528    return true;
529 }
530
531 bool discard_attribute_spool(JCR *jcr)
532 {
533    if (are_attributes_spooled(jcr)) {
534       return close_attr_spool_file(jcr, jcr->dir_bsock);
535    }
536    return true;
537 }
538
539 static void update_attr_spool_size(ssize_t size)
540 {
541    P(mutex);
542    if (size > 0) {
543      if ((spool_stats.attr_size - size) > 0) {
544         spool_stats.attr_size -= size;
545      } else {
546         spool_stats.attr_size = 0;
547      }
548    }
549    V(mutex);
550 }
551
552 bool commit_attribute_spool(JCR *jcr)
553 {
554    off_t size;
555    char ec1[30];
556
557    if (are_attributes_spooled(jcr)) {
558       if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
559          berrno be;
560          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
561               be.strerror());
562          goto bail_out;
563       }
564       size = ftello(jcr->dir_bsock->spool_fd);
565       if (size < 0) {
566          berrno be;
567          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
568               be.strerror());
569          goto bail_out;
570       }
571       P(mutex);
572       if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
573          spool_stats.max_attr_size = spool_stats.attr_size + size;
574       }
575       spool_stats.attr_size += size;
576       V(mutex);
577       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
578             edit_uint64_with_commas(size, ec1));
579       bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
580       return close_attr_spool_file(jcr, jcr->dir_bsock);
581    }
582    return true;
583
584 bail_out:
585    close_attr_spool_file(jcr, jcr->dir_bsock);
586    return false;
587 }
588
589 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
590 {
591    Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
592       jcr->Job, fd);
593 }
594
595
596 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
597 {
598    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
599
600    make_unique_spool_filename(jcr, &name, bs->fd);
601    bs->spool_fd = fopen(name, "w+b");
602    if (!bs->spool_fd) {
603       berrno be;
604       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
605            be.strerror());
606       free_pool_memory(name);
607       return false;
608    }
609    P(mutex);
610    spool_stats.attr_jobs++;
611    V(mutex);
612    free_pool_memory(name);
613    return true;
614 }
615
616 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
617 {
618    POOLMEM *name;
619
620    if (!bs->spool_fd) {
621       return true;
622    }
623    name = get_pool_memory(PM_MESSAGE);
624    P(mutex);
625    spool_stats.attr_jobs--;
626    spool_stats.total_attr_jobs++;
627    V(mutex);
628    make_unique_spool_filename(jcr, &name, bs->fd);
629    fclose(bs->spool_fd);
630    unlink(name);
631    free_pool_memory(name);
632    bs->spool_fd = NULL;
633    bs->spool = false;
634    return true;
635 }