]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
ac7bd0f3223deda94a0935017e1d3ca5399c57a5
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2004-2004 Kern Sibbald and John Walker
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License as
13    published by the Free Software Foundation; either version 2 of
14    the License, or (at your option) any later version.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19    General Public License for more details.
20
21    You should have received a copy of the GNU General Public
22    License along with this program; if not, write to the Free
23    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
24    MA 02111-1307, USA.
25
26  */
27
28 #include "bacula.h"
29 #include "stored.h"
30
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
33 static bool open_data_spool_file(JCR *jcr);
34 static bool close_data_spool_file(JCR *jcr);
35 static bool despool_data(DCR *dcr, bool commit);
36 static int  read_block_from_spool_file(DCR *dcr);
37 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
38 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
39 static bool write_spool_header(DCR *dcr);
40 static bool write_spool_data(DCR *dcr);
41
42 struct spool_stats_t {
43    uint32_t data_jobs;                /* current jobs spooling data */
44    uint32_t attr_jobs;
45    uint32_t total_data_jobs;          /* total jobs to have spooled data */
46    uint32_t total_attr_jobs;
47    int64_t max_data_size;             /* max data size */
48    int64_t max_attr_size;
49    int64_t data_size;                 /* current data size (all jobs running) */
50    int64_t attr_size;
51 };
52
53 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
54 spool_stats_t spool_stats;
55
56 /*
57  * Header for data spool record */
58 struct spool_hdr {
59    int32_t  FirstIndex;               /* FirstIndex for buffer */
60    int32_t  LastIndex;                /* LastIndex for buffer */
61    uint32_t len;                      /* length of next buffer */
62 };
63
64 enum {
65    RB_EOT = 1,
66    RB_ERROR,
67    RB_OK
68 };
69
70 void list_spool_stats(BSOCK *bs)
71 {
72    char ed1[30], ed2[30];
73    if (spool_stats.data_jobs || spool_stats.max_data_size) {
74       bnet_fsend(bs, "Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n",
75          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
76          spool_stats.total_data_jobs,
77          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
78    }
79    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
80       bnet_fsend(bs, "Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n",
81          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
82          spool_stats.total_attr_jobs,
83          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
84    }
85 }
86
87 bool begin_data_spool(JCR *jcr)
88 {
89    bool stat = true;
90    if (jcr->spool_data) {
91       Dmsg0(100, "Turning on data spooling\n");
92       jcr->dcr->spool_data = true;
93       stat = open_data_spool_file(jcr);
94       if (stat) {
95          jcr->dcr->spooling = true;
96          Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
97          P(mutex);
98          spool_stats.data_jobs++;
99          V(mutex);
100       }
101    }
102    return stat;
103 }
104
105 bool discard_data_spool(JCR *jcr)
106 {
107    if (jcr->dcr->spooling) {
108       Dmsg0(100, "Data spooling discarded\n");
109       return close_data_spool_file(jcr);
110    }
111    return true;
112 }
113
114 bool commit_data_spool(JCR *jcr)
115 {
116    bool stat;
117
118    if (jcr->dcr->spooling) {
119       Dmsg0(100, "Committing spooled data\n");
120       stat = despool_data(jcr->dcr, true /*commit*/);
121       if (!stat) {
122          Pmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
123          close_data_spool_file(jcr);
124          return false;
125       }
126       return close_data_spool_file(jcr);
127    }
128    return true;
129 }
130
131 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
132 {
133    const char *dir;
134    if (jcr->dcr->dev->device->spool_directory) {
135       dir = jcr->dcr->dev->device->spool_directory;
136    } else {
137       dir = working_directory;
138    }
139    Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, jcr->Job, 
140         jcr->dcr->device->hdr.name);
141 }
142
143
144 static bool open_data_spool_file(JCR *jcr)
145 {
146    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
147    int spool_fd;
148
149    make_unique_data_spool_filename(jcr, &name);
150    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
151       jcr->dcr->spool_fd = spool_fd;
152       jcr->spool_attributes = true;
153    } else {
154       berrno be;
155       Jmsg(jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
156            be.strerror());
157       free_pool_memory(name);
158       return false;
159    }
160    Dmsg1(100, "Created spool file: %s\n", name);
161    free_pool_memory(name);
162    return true;
163 }
164
165 static bool close_data_spool_file(JCR *jcr)
166 {
167    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
168
169    P(mutex);
170    spool_stats.data_jobs--;
171    spool_stats.total_data_jobs++;
172    if (spool_stats.data_size < jcr->dcr->spool_size) {
173       spool_stats.data_size = 0;
174    } else {
175       spool_stats.data_size -= jcr->dcr->spool_size;
176    }
177    jcr->dcr->spool_size = 0;
178    V(mutex);
179
180    make_unique_data_spool_filename(jcr, &name);
181    close(jcr->dcr->spool_fd);
182    jcr->dcr->spool_fd = -1;
183    jcr->dcr->spooling = false;
184    unlink(name);
185    Dmsg1(100, "Deleted spool file: %s\n", name);
186    free_pool_memory(name);
187    return true;
188 }
189
190 static const char *spool_name = "*spool*";
191
192 static bool despool_data(DCR *dcr, bool commit)
193 {
194    DEVICE *rdev;
195    DCR *rdcr;
196    bool ok = true;
197    DEV_BLOCK *block;
198    JCR *jcr = dcr->jcr;
199    int stat;
200    char ec1[50];
201
202    Dmsg0(100, "Despooling data\n");
203    Jmsg(jcr, M_INFO, 0, _("%s spooled data to Volume. Despooling %s bytes ...\n"),
204         commit?"Committing":"Writing",
205         edit_uint64_with_commas(jcr->dcr->spool_size, ec1));
206    dcr->spooling = false;
207    lock_device(dcr->dev);
208    dcr->dev_locked = true;
209
210    /*
211     * This is really quite kludgy and should be fixed some time.
212     * We create a dev structure to read from the spool file
213     * in rdev and rdcr.
214     */
215    rdev = (DEVICE *)malloc(sizeof(DEVICE));
216    memset(rdev, 0, sizeof(DEVICE));
217    rdev->dev_name = get_memory(strlen(spool_name)+1);
218    bstrncpy(rdev->dev_name, spool_name, sizeof(rdev->dev_name));
219    rdev->errmsg = get_pool_memory(PM_EMSG);
220    *rdev->errmsg = 0;
221    rdev->max_block_size = dcr->dev->max_block_size;
222    rdev->min_block_size = dcr->dev->min_block_size;
223    rdev->device = dcr->dev->device;
224    rdcr = new_dcr(NULL, rdev);
225    rdcr->spool_fd = dcr->spool_fd;
226    rdcr->jcr = jcr;                   /* set a valid jcr */
227    block = dcr->block;                /* save block */
228    dcr->block = rdcr->block;          /* make read and write block the same */
229
230    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
231    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
232
233    for ( ; ok; ) {
234       if (job_canceled(jcr)) {
235          ok = false;
236          break;
237       }
238       stat = read_block_from_spool_file(rdcr);
239       if (stat == RB_EOT) {
240          break;
241       } else if (stat == RB_ERROR) {
242          ok = false;
243          break;
244       }
245       ok = write_block_to_device(dcr);
246       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
247    }
248    dcr->block = block;                /* reset block */
249
250    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
251    if (ftruncate(rdcr->spool_fd, 0) != 0) {
252       berrno be;
253       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
254          be.strerror());
255       Pmsg1(000, "Bad return from ftruncate. ERR=%s\n", be.strerror());
256       ok = false;
257    }
258
259    P(mutex);
260    if (spool_stats.data_size < dcr->spool_size) {
261       spool_stats.data_size = 0;
262    } else {
263       spool_stats.data_size -= dcr->spool_size;
264    }
265    V(mutex);
266    P(dcr->dev->spool_mutex);
267    dcr->dev->spool_size -= dcr->spool_size;
268    dcr->spool_size = 0;               /* zap size in input dcr */
269    V(dcr->dev->spool_mutex);
270    free_memory(rdev->dev_name);
271    free_pool_memory(rdev->errmsg);
272    /* Be careful to NULL the jcr and free rdev after free_dcr() */
273    rdcr->jcr = NULL;
274    free_dcr(rdcr);
275    free(rdev);
276    unlock_device(dcr->dev);
277    dcr->dev_locked = false;
278    dcr->spooling = true;           /* turn on spooling again */
279    return ok;
280 }
281
282 /*
283  * Read a block from the spool file
284  *
285  *  Returns RB_OK on success
286  *          RB_EOT when file done
287  *          RB_ERROR on error
288  */
289 static int read_block_from_spool_file(DCR *dcr)
290 {
291    uint32_t rlen;
292    ssize_t stat;
293    spool_hdr hdr;
294    DEV_BLOCK *block = dcr->block;
295
296    rlen = sizeof(hdr);
297    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
298    if (stat == 0) {
299       Dmsg0(100, "EOT on spool read.\n");
300       return RB_EOT;
301    } else if (stat != (ssize_t)rlen) {
302       if (stat == -1) {
303          berrno be;
304          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
305               be.strerror());
306       } else {
307          Pmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
308          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
309       }
310       return RB_ERROR;
311    }
312    rlen = hdr.len;
313    if (rlen > block->buf_len) {
314       Pmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
315       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
316       return RB_ERROR;
317    }
318    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
319    if (stat != (ssize_t)rlen) {
320       Pmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
321       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
322       return RB_ERROR;
323    }
324    /* Setup write pointers */
325    block->binbuf = rlen;
326    block->bufp = block->buf + block->binbuf;
327    block->FirstIndex = hdr.FirstIndex;
328    block->LastIndex = hdr.LastIndex;
329    block->VolSessionId = dcr->jcr->VolSessionId;
330    block->VolSessionTime = dcr->jcr->VolSessionTime;
331    Dmsg2(800, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
332    return RB_OK;
333 }
334
335 /*
336  * Write a block to the spool file
337  *
338  *  Returns: true on success or EOT
339  *           false on hard error
340  */
341 bool write_block_to_spool_file(DCR *dcr)
342 {
343    uint32_t wlen, hlen;               /* length to write */
344    bool despool = false;
345    DEV_BLOCK *block = dcr->block;
346
347    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
348    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
349       return true;
350    }
351
352    hlen = sizeof(spool_hdr);
353    wlen = block->binbuf;
354    P(dcr->dev->spool_mutex);
355    dcr->spool_size += hlen + wlen;
356    dcr->dev->spool_size += hlen + wlen;
357    if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
358        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
359       despool = true;
360    }
361    V(dcr->dev->spool_mutex);
362    P(mutex);
363    spool_stats.data_size += hlen + wlen;
364    if (spool_stats.data_size > spool_stats.max_data_size) {
365       spool_stats.max_data_size = spool_stats.data_size;
366    }
367    V(mutex);
368    if (despool) {
369 #ifdef xDEBUG
370       char ec1[30], ec2[30], ec3[30], ec4[30];
371       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
372             "max_job_size=%s job_size=%s\n",
373             edit_uint64_with_commas(dcr->max_spool_size, ec1),
374             edit_uint64_with_commas(dcr->spool_size, ec2),
375             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
376             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
377 #endif
378       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
379       if (!despool_data(dcr, false)) {
380          Pmsg0(000, "Bad return from despool in write_block.\n");
381          return false;
382       }
383       /* Despooling cleared these variables so reset them */
384       P(dcr->dev->spool_mutex);
385       dcr->spool_size += hlen + wlen;
386       dcr->dev->spool_size += hlen + wlen;
387       V(dcr->dev->spool_mutex);
388       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
389    }
390
391
392    if (!write_spool_header(dcr)) {
393       return false;
394    }
395    if (!write_spool_data(dcr)) {
396      return false;
397    }
398
399    Dmsg2(800, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
400    empty_block(block);
401    return true;
402 }
403
404 static bool write_spool_header(DCR *dcr)
405 {
406    spool_hdr hdr;
407    ssize_t stat;
408    DEV_BLOCK *block = dcr->block;
409
410    hdr.FirstIndex = block->FirstIndex;
411    hdr.LastIndex = block->LastIndex;
412    hdr.len = block->binbuf;
413
414    /* Write header */
415    for (int retry=0; retry<=1; retry++) {
416       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
417       if (stat == -1) {
418          berrno be;
419          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
420               be.strerror());
421       }
422       if (stat != (ssize_t)sizeof(hdr)) {
423          /* If we wrote something, truncate it, then despool */
424          if (stat != -1) {
425             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
426                berrno be;
427                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
428                   be.strerror());
429                return false;
430             }
431          }
432          if (!despool_data(dcr, false)) {
433             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
434             return false;
435          }
436          continue;                    /* try again */
437       }
438       return true;
439    }
440    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
441    return false;
442 }
443
444 static bool write_spool_data(DCR *dcr)
445 {
446    ssize_t stat;
447    DEV_BLOCK *block = dcr->block;
448
449    /* Write data */
450    for (int retry=0; retry<=1; retry++) {
451       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
452       if (stat == -1) {
453          berrno be;
454          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
455               be.strerror());
456       }
457       if (stat != (ssize_t)block->binbuf) {
458          /*
459           * If we wrote something, truncate it and the header, then despool
460           */
461          if (stat != -1) {
462             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
463                       - stat - sizeof(spool_hdr)) != 0) {
464                berrno be;
465                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
466                   be.strerror());
467                return false;
468             }
469          }
470          if (!despool_data(dcr, false)) {
471             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
472             return false;
473          }
474          if (!write_spool_header(dcr)) {
475             return false;
476          }
477          continue;                    /* try again */
478       }
479       return true;
480    }
481    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
482    return false;
483 }
484
485
486
487 bool are_attributes_spooled(JCR *jcr)
488 {
489    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
490 }
491
492 /*
493  * Create spool file for attributes.
494  *  This is done by "attaching" to the bsock, and when
495  *  it is called, the output is written to a file.
496  *  The actual spooling is turned on and off in
497  *  append.c only during writing of the attributes.
498  */
499 bool begin_attribute_spool(JCR *jcr)
500 {
501    if (!jcr->no_attributes && jcr->spool_attributes) {
502       return open_attr_spool_file(jcr, jcr->dir_bsock);
503    }
504    return true;
505 }
506
507 bool discard_attribute_spool(JCR *jcr)
508 {
509    if (are_attributes_spooled(jcr)) {
510       return close_attr_spool_file(jcr, jcr->dir_bsock);
511    }
512    return true;
513 }
514
515 static void update_attr_spool_size(ssize_t size)
516 {
517    P(mutex);
518    if (size > 0) {
519      if ((spool_stats.attr_size - size) > 0) {
520         spool_stats.attr_size -= size;
521      } else {
522         spool_stats.attr_size = 0;
523      }
524    }
525    V(mutex);
526 }
527
528 bool commit_attribute_spool(JCR *jcr)
529 {
530    ssize_t size;
531    char ec1[30];
532
533    if (are_attributes_spooled(jcr)) {
534       if (fseek(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
535          berrno be;
536          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
537               be.strerror());
538       }
539       size = ftell(jcr->dir_bsock->spool_fd);
540       P(mutex);
541       if (size > 0) {
542         if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
543            spool_stats.max_attr_size = spool_stats.attr_size + size;
544         }
545       }
546       spool_stats.attr_size += size;
547       V(mutex);
548       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
549             edit_uint64_with_commas(size, ec1));
550       bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
551       return close_attr_spool_file(jcr, jcr->dir_bsock);
552    }
553    return true;
554 }
555
556 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
557 {
558    Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
559       jcr->Job, fd);
560 }
561
562
563 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
564 {
565    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
566
567    make_unique_spool_filename(jcr, &name, bs->fd);
568    bs->spool_fd = fopen(name, "w+");
569    if (!bs->spool_fd) {
570       berrno be;
571       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
572            be.strerror());
573       free_pool_memory(name);
574       return false;
575    }
576    P(mutex);
577    spool_stats.attr_jobs++;
578    V(mutex);
579    free_pool_memory(name);
580    return true;
581 }
582
583 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
584 {
585    POOLMEM *name;
586
587    if (!bs->spool_fd) {
588       return true;
589    }
590    name = get_pool_memory(PM_MESSAGE);
591    P(mutex);
592    spool_stats.attr_jobs--;
593    spool_stats.total_attr_jobs++;
594    V(mutex);
595    make_unique_spool_filename(jcr, &name, bs->fd);
596    fclose(bs->spool_fd);
597    unlink(name);
598    free_pool_memory(name);
599    bs->spool_fd = NULL;
600    bs->spool = false;
601    return true;
602 }