]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
b10360febfd4429988547855ede41d77527f451d
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code 
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2000-2004 Kern Sibbald and John Walker
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License as
13    published by the Free Software Foundation; either version 2 of
14    the License, or (at your option) any later version.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19    General Public License for more details.
20
21    You should have received a copy of the GNU General Public
22    License along with this program; if not, write to the Free
23    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
24    MA 02111-1307, USA.
25
26  */
27
28 #include "bacula.h"
29 #include "stored.h"
30
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
33 static bool open_data_spool_file(JCR *jcr);
34 static bool close_data_spool_file(JCR *jcr);
35 static bool despool_data(DCR *dcr, bool commit);
36 static int  read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
37 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
38 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
39 static bool write_spool_header(DCR *dcr, DEV_BLOCK *block);
40 static bool write_spool_data(DCR *dcr, DEV_BLOCK *block);
41
42 struct spool_stats_t {
43    uint32_t data_jobs;                /* current jobs spooling data */
44    uint32_t attr_jobs;                
45    uint32_t total_data_jobs;          /* total jobs to have spooled data */
46    uint32_t total_attr_jobs;
47    int64_t max_data_size;             /* max data size */
48    int64_t max_attr_size;
49    int64_t data_size;                 /* current data size (all jobs running) */
50    int64_t attr_size;
51 };
52
53 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
54 spool_stats_t spool_stats;
55
56 /* 
57  * Header for data spool record */
58 struct spool_hdr {
59    int32_t  FirstIndex;               /* FirstIndex for buffer */
60    int32_t  LastIndex;                /* LastIndex for buffer */
61    uint32_t len;                      /* length of next buffer */
62 };
63
64 enum {
65    RB_EOT = 1,
66    RB_ERROR,
67    RB_OK
68 };
69
70 void list_spool_stats(BSOCK *bs)
71 {
72    char ed1[30], ed2[30];
73    if (spool_stats.data_jobs || spool_stats.max_data_size) {
74       bnet_fsend(bs, "Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n",
75          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
76          spool_stats.total_data_jobs, 
77          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
78    }
79    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
80       bnet_fsend(bs, "Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n",
81          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1), 
82          spool_stats.total_attr_jobs, 
83          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
84    }
85 }
86
87 bool begin_data_spool(JCR *jcr)
88 {
89    bool stat = true;
90    if (jcr->spool_data) {
91       Dmsg0(100, "Turning on data spooling\n");
92       jcr->dcr->spool_data = true;
93       stat = open_data_spool_file(jcr);
94       if (stat) {
95          jcr->dcr->spooling = true;
96          Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
97          P(mutex);
98          spool_stats.data_jobs++;
99          V(mutex);
100       }
101    }
102    return stat;
103 }
104
105 bool discard_data_spool(JCR *jcr)
106 {
107    if (jcr->dcr->spooling) {
108       Dmsg0(100, "Data spooling discarded\n");
109       return close_data_spool_file(jcr);
110    }
111    return true;
112 }
113
114 bool commit_data_spool(JCR *jcr)
115 {
116    bool stat;
117
118    if (jcr->dcr->spooling) {
119       Dmsg0(100, "Committing spooled data\n");
120       stat = despool_data(jcr->dcr, true /*commit*/);
121       if (!stat) {
122          Dmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
123          close_data_spool_file(jcr);
124          return false;
125       }
126       return close_data_spool_file(jcr);
127    }
128    return true;
129 }
130
131 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
132 {
133    const char *dir;  
134    if (jcr->dcr->dev->device->spool_directory) {
135       dir = jcr->dcr->dev->device->spool_directory;
136    } else {
137       dir = working_directory;
138    }
139    Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, jcr->Job, jcr->device->hdr.name);
140 }
141
142
143 static bool open_data_spool_file(JCR *jcr)
144 {
145    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
146    int spool_fd;
147
148    make_unique_data_spool_filename(jcr, &name);
149    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
150       jcr->dcr->spool_fd = spool_fd;
151       jcr->spool_attributes = true;
152    } else {
153       berrno be;
154       Jmsg(jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
155            be.strerror());
156       free_pool_memory(name);
157       return false;
158    }
159    Dmsg1(100, "Created spool file: %s\n", name);
160    free_pool_memory(name);
161    return true;
162 }
163
164 static bool close_data_spool_file(JCR *jcr)
165 {
166    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
167
168    P(mutex);
169    spool_stats.data_jobs--;
170    spool_stats.total_data_jobs++;
171    if (spool_stats.data_size < jcr->dcr->spool_size) {
172       spool_stats.data_size = 0;
173    } else {
174       spool_stats.data_size -= jcr->dcr->spool_size;
175    }
176    jcr->dcr->spool_size = 0;
177    V(mutex);
178
179    make_unique_data_spool_filename(jcr, &name);
180    close(jcr->dcr->spool_fd);
181    jcr->dcr->spool_fd = -1;
182    jcr->dcr->spooling = false;
183    unlink(name);
184    Dmsg1(100, "Deleted spool file: %s\n", name);
185    free_pool_memory(name);
186    return true;
187 }
188
189 static const char *spool_name = "*spool*";
190
191 static bool despool_data(DCR *dcr, bool commit) 
192 {
193    DEVICE *rdev;
194    DCR *rdcr;
195    bool ok = true;
196    DEV_BLOCK *block;
197    JCR *jcr = dcr->jcr;
198    int stat;
199    char ec1[50];
200
201    Dmsg0(100, "Despooling data\n");
202    Jmsg(jcr, M_INFO, 0, _("%s spooled data to Volume. Despooling %s bytes ...\n"),
203         commit?"Committing":"Writting",
204         edit_uint64_with_commas(jcr->dcr->dev->spool_size, ec1));
205    dcr->spooling = false;
206    lock_device(dcr->dev);
207    dcr->dev_locked = true; 
208
209    /* 
210     * This is really quite kludgy and should be fixed some time.
211     * We create a dev structure to read from the spool file 
212     * in rdev and rdcr.
213     */
214    rdev = (DEVICE *)malloc(sizeof(DEVICE));
215    memset(rdev, 0, sizeof(DEVICE));
216    rdev->dev_name = get_memory(strlen(spool_name)+1);
217    strcpy(rdev->dev_name, spool_name);
218    rdev->errmsg = get_pool_memory(PM_EMSG);
219    *rdev->errmsg = 0;
220    rdev->max_block_size = dcr->dev->max_block_size;
221    rdev->min_block_size = dcr->dev->min_block_size;
222    rdev->device = dcr->dev->device;
223    rdcr = new_dcr(NULL, rdev);
224    rdcr->spool_fd = dcr->spool_fd; 
225    rdcr->jcr = jcr;                   /* set a valid jcr */
226    block = rdcr->block;
227
228    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
229    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
230
231    for ( ; ok; ) {
232       if (job_canceled(jcr)) {
233          ok = false;
234          break;
235       }
236       stat = read_block_from_spool_file(rdcr, block);
237       if (stat == RB_EOT) {
238          break;
239       } else if (stat == RB_ERROR) {
240          ok = false;
241          break;
242       }
243       ok = write_block_to_device(dcr, block);
244       Dmsg3(100, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
245    }
246
247    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
248    if (ftruncate(rdcr->spool_fd, 0) != 0) {
249       berrno be;
250       Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"), 
251          be.strerror());
252       Dmsg1(000, "Bad return from ftruncate. ERR=%s\n", be.strerror());
253       ok = false;
254    }
255
256    P(mutex);
257    if (spool_stats.data_size < dcr->spool_size) {
258       spool_stats.data_size = 0;
259    } else {
260       spool_stats.data_size -= dcr->spool_size;
261    }
262    V(mutex);
263    P(dcr->dev->spool_mutex);
264    dcr->dev->spool_size -= dcr->spool_size;
265    dcr->spool_size = 0;               /* zap size in input dcr */
266    V(dcr->dev->spool_mutex);
267    free_memory(rdev->dev_name);
268    free_pool_memory(rdev->errmsg);
269    /* Be careful to NULL the jcr and free rdev after free_dcr() */
270    rdcr->jcr = NULL;
271    free_dcr(rdcr);
272    free(rdev);
273    unlock_device(dcr->dev);
274    dcr->dev_locked = false;
275    dcr->spooling = true;           /* turn on spooling again */
276    return ok;
277 }
278
279 /*
280  * Read a block from the spool file
281  * 
282  *  Returns RB_OK on success
283  *          RB_EOT when file done
284  *          RB_ERROR on error
285  */
286 static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block)
287 {
288    uint32_t rlen;
289    ssize_t stat;
290    spool_hdr hdr;
291
292    rlen = sizeof(hdr);
293    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
294    if (stat == 0) {
295       Dmsg0(100, "EOT on spool read.\n");
296       return RB_EOT;
297    } else if (stat != (ssize_t)rlen) {
298       if (stat == -1) {
299          berrno be;
300          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"), 
301               be.strerror());
302       } else {
303          Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
304          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
305       }
306       return RB_ERROR;
307    }
308    rlen = hdr.len;
309    if (rlen > block->buf_len) {
310       Dmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
311       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
312       return RB_ERROR;
313    }
314    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
315    if (stat != (ssize_t)rlen) {
316       Dmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
317       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
318       return RB_ERROR;
319    }
320    /* Setup write pointers */
321    block->binbuf = rlen;
322    block->bufp = block->buf + block->binbuf;
323    block->FirstIndex = hdr.FirstIndex;
324    block->LastIndex = hdr.LastIndex;
325    block->VolSessionId = dcr->jcr->VolSessionId;
326    block->VolSessionTime = dcr->jcr->VolSessionTime;
327    Dmsg2(100, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
328    return RB_OK;
329 }
330
331 /*
332  * Write a block to the spool file
333  *
334  *  Returns: true on success or EOT
335  *           false on hard error
336  */
337 bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
338 {
339    uint32_t wlen, hlen;               /* length to write */
340    bool despool = false;
341
342    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
343    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
344       return true;
345    }
346
347    hlen = sizeof(spool_hdr);
348    wlen = block->binbuf;
349    P(dcr->dev->spool_mutex);
350    dcr->spool_size += hlen + wlen;
351    dcr->dev->spool_size += hlen + wlen;
352    if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
353        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
354       despool = true;
355    }
356    V(dcr->dev->spool_mutex);
357    P(mutex);
358    spool_stats.data_size += hlen + wlen;
359    if (spool_stats.data_size > spool_stats.max_data_size) {
360       spool_stats.max_data_size = spool_stats.data_size;
361    }
362    V(mutex);
363    if (despool) {
364 #ifdef xDEBUG 
365       char ec1[30], ec2[30], ec3[30], ec4[30];
366       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
367             "max_job_size=%s job_size=%s\n", 
368             edit_uint64_with_commas(dcr->max_spool_size, ec1),
369             edit_uint64_with_commas(dcr->spool_size, ec2),
370             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
371             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
372 #endif
373       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
374       if (!despool_data(dcr, false)) {
375          Dmsg0(000, "Bad return from despool in write_block.\n");
376          return false;
377       }
378       /* Despooling cleared these variables so reset them */
379       P(dcr->dev->spool_mutex);
380       dcr->spool_size += hlen + wlen;
381       dcr->dev->spool_size += hlen + wlen;
382       V(dcr->dev->spool_mutex);
383       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
384    }  
385
386
387    if (!write_spool_header(dcr, block)) {
388       return false;
389    }
390    if (!write_spool_data(dcr, block)) {
391      return false;
392    }
393
394    Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
395    empty_block(block);
396    return true;
397 }
398
399 static bool write_spool_header(DCR *dcr, DEV_BLOCK *block)
400 {
401    spool_hdr hdr;   
402    ssize_t stat;
403
404    hdr.FirstIndex = block->FirstIndex;
405    hdr.LastIndex = block->LastIndex;
406    hdr.len = block->binbuf;
407
408    /* Write header */
409    for (int retry=0; retry<=1; retry++) {
410       stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
411       if (stat == -1) {
412          berrno be;
413          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"), 
414               be.strerror());
415       }
416       if (stat != (ssize_t)sizeof(hdr)) {
417          /* If we wrote something, truncate it, then despool */
418          if (stat != -1) {
419             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
420                berrno be;
421                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"), 
422                   be.strerror());
423                return false;
424             }
425          }
426          if (!despool_data(dcr, false)) {
427             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
428             return false;
429          }
430          continue;                    /* try again */
431       }
432       return true;
433    }
434    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
435    return false;
436 }
437
438 static bool write_spool_data(DCR *dcr, DEV_BLOCK *block)
439 {
440    ssize_t stat;
441
442    /* Write data */
443    for (int retry=0; retry<=1; retry++) {
444       stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
445       if (stat == -1) {
446          berrno be;
447          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
448               be.strerror());
449       }
450       if (stat != (ssize_t)block->binbuf) {
451          /* 
452           * If we wrote something, truncate it and the header, then despool
453           */
454          if (stat != -1) {
455             if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
456                       - stat - sizeof(spool_hdr)) != 0) {
457                berrno be;
458                Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"), 
459                   be.strerror());
460                return false;
461             }
462          }
463          if (!despool_data(dcr, false)) {
464             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
465             return false;
466          }
467          if (!write_spool_header(dcr, block)) {
468             return false;
469          }
470          continue;                    /* try again */
471       }
472       return true;
473    }
474    Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
475    return false;
476 }
477
478
479
480 bool are_attributes_spooled(JCR *jcr)
481 {
482    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
483 }
484
485 /* 
486  * Create spool file for attributes.
487  *  This is done by "attaching" to the bsock, and when
488  *  it is called, the output is written to a file.
489  *  The actual spooling is turned on and off in
490  *  append.c only during writing of the attributes.
491  */
492 bool begin_attribute_spool(JCR *jcr)
493 {
494    if (!jcr->no_attributes && jcr->spool_attributes) {
495       return open_attr_spool_file(jcr, jcr->dir_bsock);
496    }
497    return true;
498 }
499
500 bool discard_attribute_spool(JCR *jcr)
501 {
502    if (are_attributes_spooled(jcr)) {
503       return close_attr_spool_file(jcr, jcr->dir_bsock);
504    }
505    return true;
506 }
507
508 static void update_attr_spool_size(ssize_t size)
509 {
510    P(mutex);
511    if (size > 0) {
512      if ((spool_stats.attr_size - size) > 0) {
513         spool_stats.attr_size -= size;
514      } else {
515         spool_stats.attr_size = 0;
516      }
517    }
518    V(mutex);
519 }
520
521 bool commit_attribute_spool(JCR *jcr)
522 {
523    ssize_t size;
524    char ec1[30];
525
526    if (are_attributes_spooled(jcr)) {
527       if (fseek(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
528          berrno be;
529          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
530               be.strerror());
531       }
532       size = ftell(jcr->dir_bsock->spool_fd);
533       P(mutex);
534       if (size > 0) {
535         if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
536            spool_stats.max_attr_size = spool_stats.attr_size + size;
537         } 
538       }
539       spool_stats.attr_size += size;
540       V(mutex);
541       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to DIR. Despooling %s bytes ...\n"),
542             edit_uint64_with_commas(size, ec1));
543       bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
544       return close_attr_spool_file(jcr, jcr->dir_bsock);
545    }
546    return true;
547 }
548
549 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
550 {
551    Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
552       jcr->Job, fd);
553 }
554
555
556 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
557 {
558    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
559
560    make_unique_spool_filename(jcr, &name, bs->fd);
561    bs->spool_fd = fopen(mp_chr(name), "w+");
562    if (!bs->spool_fd) {
563       berrno be;
564       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
565            be.strerror());
566       free_pool_memory(name);
567       return false;
568    }
569    P(mutex);
570    spool_stats.attr_jobs++;
571    V(mutex);
572    free_pool_memory(name);
573    return true;
574 }
575
576 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
577 {
578    POOLMEM *name;
579     
580    if (!bs->spool_fd) {
581       return true;
582    }
583    name = get_pool_memory(PM_MESSAGE);
584    P(mutex);
585    spool_stats.attr_jobs--;
586    spool_stats.total_attr_jobs++;
587    V(mutex);
588    make_unique_spool_filename(jcr, &name, bs->fd);
589    fclose(bs->spool_fd);
590    unlink(mp_chr(name));
591    free_pool_memory(name);
592    bs->spool_fd = NULL;
593    bs->spool = false;
594    return true;
595 }