]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
ad1e4c0b059d8fa7404061c3d00e06c60527326e
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code 
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2000-2004 Kern Sibbald and John Walker
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License as
13    published by the Free Software Foundation; either version 2 of
14    the License, or (at your option) any later version.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19    General Public License for more details.
20
21    You should have received a copy of the GNU General Public
22    License along with this program; if not, write to the Free
23    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
24    MA 02111-1307, USA.
25
26  */
27
28 #include "bacula.h"
29 #include "stored.h"
30
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
33 static bool open_data_spool_file(JCR *jcr);
34 static bool close_data_spool_file(JCR *jcr);
35 static bool despool_data(DCR *dcr);
36 static int  read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
37
38 struct spool_stats_t {
39    uint32_t data_jobs;                /* current jobs spooling data */
40    uint32_t attr_jobs;                
41    uint32_t total_data_jobs;          /* total jobs to have spooled data */
42    uint32_t total_attr_jobs;
43    uint64_t max_data_size;            /* max data size */
44    uint64_t max_attr_size;
45    uint64_t data_size;                /* current data size (all jobs running) */
46    uint64_t attr_size;
47 };
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50 spool_stats_t spool_stats;
51
52 /* 
53  * Header for data spool record */
54 struct spool_hdr {
55    int32_t  FirstIndex;               /* FirstIndex for buffer */
56    int32_t  LastIndex;                /* LastIndex for buffer */
57    uint32_t len;                      /* length of next buffer */
58 };
59
60 enum {
61    RB_EOT = 1,
62    RB_ERROR,
63    RB_OK
64 };
65
66 void list_spool_stats(BSOCK *bs)
67 {
68    char ed1[30], ed2[30];
69    if (spool_stats.data_jobs || spool_stats.max_data_size) {
70       bnet_fsend(bs, "Data spooling: %d active jobs, %s bytes; %d total jobs, %s max bytes/job.\n",
71          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
72          spool_stats.total_data_jobs, 
73          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
74    }
75    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
76       bnet_fsend(bs, "Attr spooling: %d active jobs; %d total jobs, %s max bytes/job.\n",
77          spool_stats.attr_jobs, spool_stats.total_attr_jobs, 
78          edit_uint64_with_commas(spool_stats.max_attr_size, ed1));
79    }
80 }
81
82 bool begin_data_spool(JCR *jcr)
83 {
84    bool stat = true;
85    if (jcr->spool_data) {
86       Dmsg0(100, "Turning on data spooling\n");
87       jcr->dcr->spool_data = true;
88       stat = open_data_spool_file(jcr);
89       if (stat) {
90          jcr->dcr->spooling = true;
91          Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
92          P(mutex);
93          spool_stats.data_jobs++;
94          V(mutex);
95       }
96    }
97    return stat;
98 }
99
100 bool discard_data_spool(JCR *jcr)
101 {
102    if (jcr->dcr->spooling) {
103       Dmsg0(100, "Data spooling discarded\n");
104       return close_data_spool_file(jcr);
105    }
106    return true;
107 }
108
109 bool commit_data_spool(JCR *jcr)
110 {
111    bool stat;
112    char ec1[40];
113
114    if (jcr->dcr->spooling) {
115       Dmsg0(100, "Committing spooled data\n");
116       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
117             edit_uint64_with_commas(jcr->dcr->dev->spool_size, ec1));
118       stat = despool_data(jcr->dcr);
119       if (!stat) {
120          Dmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
121          close_data_spool_file(jcr);
122          return false;
123       }
124       return close_data_spool_file(jcr);
125    }
126    return true;
127 }
128
129 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
130 {
131    char *dir;  
132    if (jcr->dcr->dev->device->spool_directory) {
133       dir = jcr->dcr->dev->device->spool_directory;
134    } else {
135       dir = working_directory;
136    }
137    Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, jcr->Job, jcr->device->hdr.name);
138 }
139
140
141 static bool open_data_spool_file(JCR *jcr)
142 {
143    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
144    int spool_fd;
145
146    make_unique_data_spool_filename(jcr, &name);
147    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
148       jcr->dcr->spool_fd = spool_fd;
149       jcr->spool_attributes = true;
150    } else {
151       Jmsg(jcr, M_ERROR, 0, _("Open data spool file %s failed: ERR=%s\n"), name, strerror(errno));
152       free_pool_memory(name);
153       return false;
154    }
155    Dmsg1(100, "Created spool file: %s\n", name);
156    free_pool_memory(name);
157    return true;
158 }
159
160 static bool close_data_spool_file(JCR *jcr)
161 {
162    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
163
164    P(mutex);
165    spool_stats.data_jobs--;
166    spool_stats.total_data_jobs++;
167    if (spool_stats.data_size < jcr->dcr->spool_size) {
168       spool_stats.data_size = 0;
169    } else {
170       spool_stats.data_size -= jcr->dcr->spool_size;
171    }
172    jcr->dcr->spool_size = 0;
173    V(mutex);
174
175    make_unique_data_spool_filename(jcr, &name);
176    close(jcr->dcr->spool_fd);
177    jcr->dcr->spool_fd = -1;
178    jcr->dcr->spooling = false;
179    unlink(name);
180    Dmsg1(100, "Deleted spool file: %s\n", name);
181    free_pool_memory(name);
182    return true;
183 }
184
185 static bool despool_data(DCR *dcr)
186 {
187    DEVICE *rdev;
188    DCR *rdcr;
189    bool ok = true;
190    DEV_BLOCK *block;
191    JCR *jcr = dcr->jcr;
192    int stat;
193
194    Dmsg0(100, "Despooling data\n");
195    dcr->spooling = false;
196    lock_device(dcr->dev);
197    dcr->dev_locked = true; 
198
199    /* Setup a dev structure to read */
200    rdev = (DEVICE *)malloc(sizeof(DEVICE));
201    memset(rdev, 0, sizeof(DEVICE));
202    rdev->dev_name = get_memory(strlen("spool")+1);
203    strcpy(rdev->dev_name, "spool");
204    rdev->errmsg = get_pool_memory(PM_EMSG);
205    *rdev->errmsg = 0;
206    rdev->max_block_size = dcr->dev->max_block_size;
207    rdev->min_block_size = dcr->dev->min_block_size;
208    rdev->device = dcr->dev->device;
209    rdcr = new_dcr(NULL, rdev);
210    rdcr->spool_fd = dcr->spool_fd; 
211    rdcr->jcr = jcr;                   /* set a valid jcr */
212    block = rdcr->block;
213    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
214    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
215
216    for ( ; ok; ) {
217       if (job_canceled(jcr)) {
218          ok = false;
219          break;
220       }
221       stat = read_block_from_spool_file(rdcr, block);
222       if (stat == RB_EOT) {
223          break;
224       } else if (stat == RB_ERROR) {
225          ok = false;
226          break;
227       }
228       ok = write_block_to_device(dcr, block);
229       Dmsg3(100, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
230    }
231
232    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
233    if (ftruncate(rdcr->spool_fd, 0) != 0) {
234       Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file error. ERR=%s\n"), 
235          strerror(errno));
236       Dmsg1(000, "Bad return from ftruncate. ERR=%s\n", strerror(errno));
237       ok = false;
238    }
239
240    P(mutex);
241    if (spool_stats.data_size < dcr->spool_size) {
242       spool_stats.data_size = 0;
243    } else {
244       spool_stats.data_size -= dcr->spool_size;
245    }
246    V(mutex);
247    P(dcr->dev->spool_mutex);
248    dcr->dev->spool_size -= dcr->spool_size;
249    dcr->spool_size = 0;               /* zap size in input dcr */
250    V(dcr->dev->spool_mutex);
251    free_memory(rdev->dev_name);
252    free_pool_memory(rdev->errmsg);
253    free(rdev);
254    rdcr->jcr = NULL;
255    free_dcr(rdcr);
256    unlock_device(dcr->dev);
257    dcr->dev_locked = false;
258    dcr->spooling = true;           /* turn on spooling again */
259    return ok;
260 }
261
262 /*
263  * Read a block from the spool file
264  * 
265  *  Returns RB_OK on success
266  *          RB_EOT when file done
267  *          RB_ERROR on error
268  */
269 static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block)
270 {
271    uint32_t rlen;
272    ssize_t stat;
273    spool_hdr hdr;
274
275    rlen = sizeof(hdr);
276    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
277    if (stat == 0) {
278       Dmsg0(100, "EOT on spool read.\n");
279       return RB_EOT;
280    } else if (stat != (ssize_t)rlen) {
281       if (stat == -1) {
282          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"), strerror(errno));
283       } else {
284          Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
285          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
286       }
287       return RB_ERROR;
288    }
289    rlen = hdr.len;
290    if (rlen > block->buf_len) {
291       Dmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
292       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
293       return RB_ERROR;
294    }
295    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
296    if (stat != (ssize_t)rlen) {
297       Dmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
298       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
299       return RB_ERROR;
300    }
301    /* Setup write pointers */
302    block->binbuf = rlen;
303    block->bufp = block->buf + block->binbuf;
304    block->FirstIndex = hdr.FirstIndex;
305    block->LastIndex = hdr.LastIndex;
306    block->VolSessionId = dcr->jcr->VolSessionId;
307    block->VolSessionTime = dcr->jcr->VolSessionTime;
308    Dmsg2(100, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
309    return RB_OK;
310 }
311
312 /*
313  * Write a block to the spool file
314  *
315  *  Returns: true on success or EOT
316  *           false on hard error
317  */
318 bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
319 {
320    ssize_t stat = 0;
321    uint32_t wlen, hlen;               /* length to write */
322    int retry = 0;
323    spool_hdr hdr;   
324    bool despool = false;
325
326    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
327    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
328       return true;
329    }
330
331    hlen = sizeof(hdr);
332    wlen = block->binbuf;
333    P(dcr->dev->spool_mutex);
334    dcr->spool_size += hlen + wlen;
335    dcr->dev->spool_size += hlen + wlen;
336    if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
337        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
338       despool = true;
339    }
340    V(dcr->dev->spool_mutex);
341    P(mutex);
342    spool_stats.data_size += hlen + wlen;
343    if (spool_stats.data_size > spool_stats.max_data_size) {
344       spool_stats.max_data_size = spool_stats.data_size;
345    }
346    V(mutex);
347    if (despool) {
348       char ec1[30];
349 #ifdef xDEBUG 
350       char ec2[30], ec3[30], ec4[30];
351       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
352             "max_job_size=%s job_size=%s\n", 
353             edit_uint64_with_commas(dcr->max_spool_size, ec1),
354             edit_uint64_with_commas(dcr->spool_size, ec2),
355             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
356             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
357 #endif
358       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached. Despooling %s bytes ...\n"),
359             edit_uint64_with_commas(dcr->dev->spool_size, ec1));
360       if (!despool_data(dcr)) {
361          Dmsg0(000, "Bad return from despool in write_block.\n");
362          return false;
363       }
364       /* Despooling cleared these variables so reset them */
365       P(dcr->dev->spool_mutex);
366       dcr->spool_size += hlen + wlen;
367       dcr->dev->spool_size += hlen + wlen;
368       V(dcr->dev->spool_mutex);
369       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
370    }  
371
372    hdr.FirstIndex = block->FirstIndex;
373    hdr.LastIndex = block->LastIndex;
374    hdr.len = block->binbuf;
375
376    /* Write header */
377    for ( ;; ) {
378       stat = write(dcr->spool_fd, (char*)&hdr, (size_t)hlen);
379       if (stat == -1) {
380          Jmsg(dcr->jcr, M_INFO, 0, _("Error writing header to spool file. ERR=%s\n"), strerror(errno));
381       }
382       if (stat != (ssize_t)hlen) {
383          if (!despool_data(dcr)) {
384             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
385             return false;
386          }
387          if (retry++ > 1) {
388             return false;
389          }
390          continue;
391       }
392       break;
393    }
394
395    /* Write data */
396    for ( ;; ) {
397       stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
398       if (stat == -1) {
399          Jmsg(dcr->jcr, M_INFO, 0, _("Error writing data to spool file. ERR=%s\n"), strerror(errno));
400       }
401       if (stat != (ssize_t)wlen) {
402          if (!despool_data(dcr)) {
403             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
404             return false;
405          }
406          if (retry++ > 1) {
407             return false;
408          }
409          continue;
410       }
411       break;
412    }
413    Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
414
415    empty_block(block);
416    return true;
417 }
418
419
420 bool are_attributes_spooled(JCR *jcr)
421 {
422    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
423 }
424
425 /* 
426  * Create spool file for attributes.
427  *  This is done by "attaching" to the bsock, and when
428  *  it is called, the output is written to a file.
429  *  The actual spooling is turned on and off in
430  *  append.c only during writing of the attributes.
431  */
432 bool begin_attribute_spool(JCR *jcr)
433 {
434    if (!jcr->no_attributes && jcr->spool_attributes) {
435       return open_spool_file(jcr, jcr->dir_bsock);
436    }
437    return true;
438 }
439
440 bool discard_attribute_spool(JCR *jcr)
441 {
442    if (are_attributes_spooled(jcr)) {
443       return close_spool_file(jcr, jcr->dir_bsock);
444    }
445    return true;
446 }
447
448 bool commit_attribute_spool(JCR *jcr)
449 {
450    if (are_attributes_spooled(jcr)) {
451       bnet_despool_to_bsock(jcr->dir_bsock);
452       return close_spool_file(jcr, jcr->dir_bsock);
453    }
454    return true;
455 }
456
457 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
458 {
459    Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
460       jcr->Job, fd);
461 }
462
463 bool open_spool_file(JCR *jcr, BSOCK *bs)
464 {
465     POOLMEM *name  = get_pool_memory(PM_MESSAGE);
466
467     make_unique_spool_filename(jcr, &name, bs->fd);
468     bs->spool_fd = fopen(mp_chr(name), "w+");
469     if (!bs->spool_fd) {
470        Jmsg(jcr, M_ERROR, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name, strerror(errno));
471        free_pool_memory(name);
472        return false;
473     }
474     P(mutex);
475     spool_stats.attr_jobs++;
476     V(mutex);
477     free_pool_memory(name);
478     return true;
479 }
480
481 bool close_spool_file(JCR *jcr, BSOCK *bs)
482 {
483     POOLMEM *name  = get_pool_memory(PM_MESSAGE);
484     ssize_t size;
485      
486     fseek(bs->spool_fd, 0, SEEK_END);
487     size = ftell(bs->spool_fd);
488     P(mutex);
489     if (size > 0) {
490        if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
491           spool_stats.max_attr_size = spool_stats.attr_size + size;
492        } 
493     }
494     spool_stats.attr_jobs--;
495     spool_stats.total_attr_jobs++;
496     V(mutex);
497     make_unique_spool_filename(jcr, &name, bs->fd);
498     fclose(bs->spool_fd);
499     unlink(mp_chr(name));
500     free_pool_memory(name);
501     bs->spool_fd = NULL;
502     bs->spool = false;
503     return true;
504 }