]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
a9caea326f61e424bdb2972bffcda37dc8f54778
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code 
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2000-2004 Kern Sibbald and John Walker
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License as
13    published by the Free Software Foundation; either version 2 of
14    the License, or (at your option) any later version.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19    General Public License for more details.
20
21    You should have received a copy of the GNU General Public
22    License along with this program; if not, write to the Free
23    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
24    MA 02111-1307, USA.
25
26  */
27
28 #include "bacula.h"
29 #include "stored.h"
30
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
33 static bool open_data_spool_file(JCR *jcr);
34 static bool close_data_spool_file(JCR *jcr);
35 static bool despool_data(DCR *dcr);
36 static int  read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
37 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
38 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
39
40 struct spool_stats_t {
41    uint32_t data_jobs;                /* current jobs spooling data */
42    uint32_t attr_jobs;                
43    uint32_t total_data_jobs;          /* total jobs to have spooled data */
44    uint32_t total_attr_jobs;
45    int64_t max_data_size;             /* max data size */
46    int64_t max_attr_size;
47    int64_t data_size;                 /* current data size (all jobs running) */
48    int64_t attr_size;
49 };
50
51 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
52 spool_stats_t spool_stats;
53
54 /* 
55  * Header for data spool record */
56 struct spool_hdr {
57    int32_t  FirstIndex;               /* FirstIndex for buffer */
58    int32_t  LastIndex;                /* LastIndex for buffer */
59    uint32_t len;                      /* length of next buffer */
60 };
61
62 enum {
63    RB_EOT = 1,
64    RB_ERROR,
65    RB_OK
66 };
67
68 void list_spool_stats(BSOCK *bs)
69 {
70    char ed1[30], ed2[30];
71    if (spool_stats.data_jobs || spool_stats.max_data_size) {
72       bnet_fsend(bs, "Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n",
73          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
74          spool_stats.total_data_jobs, 
75          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
76    }
77    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
78       bnet_fsend(bs, "Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n",
79          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1), 
80          spool_stats.total_attr_jobs, 
81          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
82    }
83 }
84
85 bool begin_data_spool(JCR *jcr)
86 {
87    bool stat = true;
88    if (jcr->spool_data) {
89       Dmsg0(100, "Turning on data spooling\n");
90       jcr->dcr->spool_data = true;
91       stat = open_data_spool_file(jcr);
92       if (stat) {
93          jcr->dcr->spooling = true;
94          Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
95          P(mutex);
96          spool_stats.data_jobs++;
97          V(mutex);
98       }
99    }
100    return stat;
101 }
102
103 bool discard_data_spool(JCR *jcr)
104 {
105    if (jcr->dcr->spooling) {
106       Dmsg0(100, "Data spooling discarded\n");
107       return close_data_spool_file(jcr);
108    }
109    return true;
110 }
111
112 bool commit_data_spool(JCR *jcr)
113 {
114    bool stat;
115    char ec1[40];
116
117    if (jcr->dcr->spooling) {
118       Dmsg0(100, "Committing spooled data\n");
119       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
120             edit_uint64_with_commas(jcr->dcr->dev->spool_size, ec1));
121       stat = despool_data(jcr->dcr);
122       if (!stat) {
123          Dmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
124          close_data_spool_file(jcr);
125          return false;
126       }
127       return close_data_spool_file(jcr);
128    }
129    return true;
130 }
131
132 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
133 {
134    const char *dir;  
135    if (jcr->dcr->dev->device->spool_directory) {
136       dir = jcr->dcr->dev->device->spool_directory;
137    } else {
138       dir = working_directory;
139    }
140    Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, jcr->Job, jcr->device->hdr.name);
141 }
142
143
144 static bool open_data_spool_file(JCR *jcr)
145 {
146    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
147    int spool_fd;
148
149    make_unique_data_spool_filename(jcr, &name);
150    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
151       jcr->dcr->spool_fd = spool_fd;
152       jcr->spool_attributes = true;
153    } else {
154       Jmsg(jcr, M_ERROR, 0, _("Open data spool file %s failed: ERR=%s\n"), name, strerror(errno));
155       free_pool_memory(name);
156       return false;
157    }
158    Dmsg1(100, "Created spool file: %s\n", name);
159    free_pool_memory(name);
160    return true;
161 }
162
163 static bool close_data_spool_file(JCR *jcr)
164 {
165    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
166
167    P(mutex);
168    spool_stats.data_jobs--;
169    spool_stats.total_data_jobs++;
170    if (spool_stats.data_size < jcr->dcr->spool_size) {
171       spool_stats.data_size = 0;
172    } else {
173       spool_stats.data_size -= jcr->dcr->spool_size;
174    }
175    jcr->dcr->spool_size = 0;
176    V(mutex);
177
178    make_unique_data_spool_filename(jcr, &name);
179    close(jcr->dcr->spool_fd);
180    jcr->dcr->spool_fd = -1;
181    jcr->dcr->spooling = false;
182    unlink(name);
183    Dmsg1(100, "Deleted spool file: %s\n", name);
184    free_pool_memory(name);
185    return true;
186 }
187
188 static bool despool_data(DCR *dcr)
189 {
190    DEVICE *rdev;
191    DCR *rdcr;
192    bool ok = true;
193    DEV_BLOCK *block;
194    JCR *jcr = dcr->jcr;
195    int stat;
196
197    Dmsg0(100, "Despooling data\n");
198    dcr->spooling = false;
199    lock_device(dcr->dev);
200    dcr->dev_locked = true; 
201
202    /* Setup a dev structure to read */
203    rdev = (DEVICE *)malloc(sizeof(DEVICE));
204    memset(rdev, 0, sizeof(DEVICE));
205    rdev->dev_name = get_memory(strlen("spool")+1);
206    strcpy(rdev->dev_name, "spool");
207    rdev->errmsg = get_pool_memory(PM_EMSG);
208    *rdev->errmsg = 0;
209    rdev->max_block_size = dcr->dev->max_block_size;
210    rdev->min_block_size = dcr->dev->min_block_size;
211    rdev->device = dcr->dev->device;
212    rdcr = new_dcr(NULL, rdev);
213    rdcr->spool_fd = dcr->spool_fd; 
214    rdcr->jcr = jcr;                   /* set a valid jcr */
215    block = rdcr->block;
216    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
217    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
218
219    for ( ; ok; ) {
220       if (job_canceled(jcr)) {
221          ok = false;
222          break;
223       }
224       stat = read_block_from_spool_file(rdcr, block);
225       if (stat == RB_EOT) {
226          break;
227       } else if (stat == RB_ERROR) {
228          ok = false;
229          break;
230       }
231       ok = write_block_to_device(dcr, block);
232       Dmsg3(100, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
233    }
234
235    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
236    if (ftruncate(rdcr->spool_fd, 0) != 0) {
237       Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file error. ERR=%s\n"), 
238          strerror(errno));
239       Dmsg1(000, "Bad return from ftruncate. ERR=%s\n", strerror(errno));
240       ok = false;
241    }
242
243    P(mutex);
244    if (spool_stats.data_size < dcr->spool_size) {
245       spool_stats.data_size = 0;
246    } else {
247       spool_stats.data_size -= dcr->spool_size;
248    }
249    V(mutex);
250    P(dcr->dev->spool_mutex);
251    dcr->dev->spool_size -= dcr->spool_size;
252    dcr->spool_size = 0;               /* zap size in input dcr */
253    V(dcr->dev->spool_mutex);
254    free_memory(rdev->dev_name);
255    free_pool_memory(rdev->errmsg);
256    free(rdev);
257    rdcr->jcr = NULL;
258    free_dcr(rdcr);
259    unlock_device(dcr->dev);
260    dcr->dev_locked = false;
261    dcr->spooling = true;           /* turn on spooling again */
262    return ok;
263 }
264
265 /*
266  * Read a block from the spool file
267  * 
268  *  Returns RB_OK on success
269  *          RB_EOT when file done
270  *          RB_ERROR on error
271  */
272 static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block)
273 {
274    uint32_t rlen;
275    ssize_t stat;
276    spool_hdr hdr;
277
278    rlen = sizeof(hdr);
279    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
280    if (stat == 0) {
281       Dmsg0(100, "EOT on spool read.\n");
282       return RB_EOT;
283    } else if (stat != (ssize_t)rlen) {
284       if (stat == -1) {
285          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"), strerror(errno));
286       } else {
287          Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
288          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
289       }
290       return RB_ERROR;
291    }
292    rlen = hdr.len;
293    if (rlen > block->buf_len) {
294       Dmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
295       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
296       return RB_ERROR;
297    }
298    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
299    if (stat != (ssize_t)rlen) {
300       Dmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
301       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
302       return RB_ERROR;
303    }
304    /* Setup write pointers */
305    block->binbuf = rlen;
306    block->bufp = block->buf + block->binbuf;
307    block->FirstIndex = hdr.FirstIndex;
308    block->LastIndex = hdr.LastIndex;
309    block->VolSessionId = dcr->jcr->VolSessionId;
310    block->VolSessionTime = dcr->jcr->VolSessionTime;
311    Dmsg2(100, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
312    return RB_OK;
313 }
314
315 /*
316  * Write a block to the spool file
317  *
318  *  Returns: true on success or EOT
319  *           false on hard error
320  */
321 bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
322 {
323    ssize_t stat = 0;
324    uint32_t wlen, hlen;               /* length to write */
325    int retry = 0;
326    spool_hdr hdr;   
327    bool despool = false;
328
329    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
330    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
331       return true;
332    }
333
334    hlen = sizeof(hdr);
335    wlen = block->binbuf;
336    P(dcr->dev->spool_mutex);
337    dcr->spool_size += hlen + wlen;
338    dcr->dev->spool_size += hlen + wlen;
339    if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
340        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
341       despool = true;
342    }
343    V(dcr->dev->spool_mutex);
344    P(mutex);
345    spool_stats.data_size += hlen + wlen;
346    if (spool_stats.data_size > spool_stats.max_data_size) {
347       spool_stats.max_data_size = spool_stats.data_size;
348    }
349    V(mutex);
350    if (despool) {
351       char ec1[30];
352 #ifdef xDEBUG 
353       char ec2[30], ec3[30], ec4[30];
354       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
355             "max_job_size=%s job_size=%s\n", 
356             edit_uint64_with_commas(dcr->max_spool_size, ec1),
357             edit_uint64_with_commas(dcr->spool_size, ec2),
358             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
359             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
360 #endif
361       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached. Despooling %s bytes ...\n"),
362             edit_uint64_with_commas(dcr->dev->spool_size, ec1));
363       if (!despool_data(dcr)) {
364          Dmsg0(000, "Bad return from despool in write_block.\n");
365          return false;
366       }
367       /* Despooling cleared these variables so reset them */
368       P(dcr->dev->spool_mutex);
369       dcr->spool_size += hlen + wlen;
370       dcr->dev->spool_size += hlen + wlen;
371       V(dcr->dev->spool_mutex);
372       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
373    }  
374
375    hdr.FirstIndex = block->FirstIndex;
376    hdr.LastIndex = block->LastIndex;
377    hdr.len = block->binbuf;
378
379    /* Write header */
380    for ( ;; ) {
381       stat = write(dcr->spool_fd, (char*)&hdr, (size_t)hlen);
382       if (stat == -1) {
383          Jmsg(dcr->jcr, M_INFO, 0, _("Error writing header to spool file. ERR=%s\n"), strerror(errno));
384       }
385       if (stat != (ssize_t)hlen) {
386          if (!despool_data(dcr)) {
387             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
388             return false;
389          }
390          if (retry++ > 1) {
391             return false;
392          }
393          continue;
394       }
395       break;
396    }
397
398    /* Write data */
399    for ( ;; ) {
400       stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
401       if (stat == -1) {
402          Jmsg(dcr->jcr, M_INFO, 0, _("Error writing data to spool file. ERR=%s\n"), strerror(errno));
403       }
404       if (stat != (ssize_t)wlen) {
405          if (!despool_data(dcr)) {
406             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
407             return false;
408          }
409          if (retry++ > 1) {
410             return false;
411          }
412          continue;
413       }
414       break;
415    }
416    Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
417
418    empty_block(block);
419    return true;
420 }
421
422
423 bool are_attributes_spooled(JCR *jcr)
424 {
425    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
426 }
427
428 /* 
429  * Create spool file for attributes.
430  *  This is done by "attaching" to the bsock, and when
431  *  it is called, the output is written to a file.
432  *  The actual spooling is turned on and off in
433  *  append.c only during writing of the attributes.
434  */
435 bool begin_attribute_spool(JCR *jcr)
436 {
437    if (!jcr->no_attributes && jcr->spool_attributes) {
438       return open_attr_spool_file(jcr, jcr->dir_bsock);
439    }
440    return true;
441 }
442
443 bool discard_attribute_spool(JCR *jcr)
444 {
445    if (are_attributes_spooled(jcr)) {
446       return close_attr_spool_file(jcr, jcr->dir_bsock);
447    }
448    return true;
449 }
450
451 static void update_attr_spool_size(ssize_t size)
452 {
453    P(mutex);
454    if (size > 0) {
455      if ((spool_stats.attr_size - size) > 0) {
456         spool_stats.attr_size -= size;
457      } else {
458         spool_stats.attr_size = 0;
459      }
460    }
461    V(mutex);
462 }
463
464 bool commit_attribute_spool(JCR *jcr)
465 {
466    ssize_t size;
467    char ec1[30];
468
469    if (are_attributes_spooled(jcr)) {
470       fseek(jcr->dir_bsock->spool_fd, 0, SEEK_END);
471       size = ftell(jcr->dir_bsock->spool_fd);
472       P(mutex);
473       if (size > 0) {
474         if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
475            spool_stats.max_attr_size = spool_stats.attr_size + size;
476         } 
477       }
478       spool_stats.attr_size += size;
479       V(mutex);
480       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to DIR. Despooling %s bytes ...\n"),
481             edit_uint64_with_commas(size, ec1));
482       bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
483       return close_attr_spool_file(jcr, jcr->dir_bsock);
484    }
485    return true;
486 }
487
488 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
489 {
490    Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
491       jcr->Job, fd);
492 }
493
494
495 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
496 {
497    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
498
499    make_unique_spool_filename(jcr, &name, bs->fd);
500    bs->spool_fd = fopen(mp_chr(name), "w+");
501    if (!bs->spool_fd) {
502       Jmsg(jcr, M_ERROR, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name, strerror(errno));
503       free_pool_memory(name);
504       return false;
505    }
506    P(mutex);
507    spool_stats.attr_jobs++;
508    V(mutex);
509    free_pool_memory(name);
510    return true;
511 }
512
513 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
514 {
515    POOLMEM *name;
516     
517    if (!bs->spool_fd) {
518       return true;
519    }
520    name = get_pool_memory(PM_MESSAGE);
521    P(mutex);
522    spool_stats.attr_jobs--;
523    spool_stats.total_attr_jobs++;
524    V(mutex);
525    make_unique_spool_filename(jcr, &name, bs->fd);
526    fclose(bs->spool_fd);
527    unlink(mp_chr(name));
528    free_pool_memory(name);
529    bs->spool_fd = NULL;
530    bs->spool = false;
531    return true;
532 }