]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
Implement selecting another Volume if busy + cleanup spool messages
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code 
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2000-2004 Kern Sibbald and John Walker
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License as
13    published by the Free Software Foundation; either version 2 of
14    the License, or (at your option) any later version.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19    General Public License for more details.
20
21    You should have received a copy of the GNU General Public
22    License along with this program; if not, write to the Free
23    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
24    MA 02111-1307, USA.
25
26  */
27
28 #include "bacula.h"
29 #include "stored.h"
30
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
33 static bool open_data_spool_file(JCR *jcr);
34 static bool close_data_spool_file(JCR *jcr);
35 static bool despool_data(DCR *dcr);
36 static int  read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
37
38 struct spool_stats_t {
39    uint32_t data_jobs;                /* current jobs spooling data */
40    uint32_t attr_jobs;                
41    uint32_t total_data_jobs;          /* total jobs to have spooled data */
42    uint32_t total_attr_jobs;
43    uint64_t max_data_size;            /* max data size */
44    uint64_t max_attr_size;
45    uint64_t data_size;                /* current data size (all jobs running) */
46    uint64_t attr_size;
47 };
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50 spool_stats_t spool_stats;
51
52 /* 
53  * Header for data spool record */
54 struct spool_hdr {
55    int32_t  FirstIndex;               /* FirstIndex for buffer */
56    int32_t  LastIndex;                /* LastIndex for buffer */
57    uint32_t len;                      /* length of next buffer */
58 };
59
60 enum {
61    RB_EOT = 1,
62    RB_ERROR,
63    RB_OK
64 };
65
66 void list_spool_stats(BSOCK *bs)
67 {
68    char ed1[30], ed2[30];
69    bnet_fsend(bs, "Data spooling: %d active jobs %s bytes; %d total jobs %s max bytes/job.\n",
70       spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71       spool_stats.total_data_jobs, edit_uint64_with_commas(spool_stats.max_data_size, ed2));
72    bnet_fsend(bs, "Attr spooling: %d active jobs; %d total jobs %s max bytes/job.\n",
73       spool_stats.attr_jobs, 
74       spool_stats.total_attr_jobs, edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
75 }
76
77 bool begin_data_spool(JCR *jcr)
78 {
79    bool stat = true;
80    if (jcr->spool_data) {
81       Dmsg0(100, "Turning on data spooling\n");
82       jcr->dcr->spool_data = true;
83       stat = open_data_spool_file(jcr);
84       if (stat) {
85          jcr->dcr->spooling = true;
86          Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
87          P(mutex);
88          spool_stats.data_jobs++;
89          V(mutex);
90       }
91    }
92    return stat;
93 }
94
95 bool discard_data_spool(JCR *jcr)
96 {
97    if (jcr->dcr->spooling) {
98       Dmsg0(100, "Data spooling discarded\n");
99       return close_data_spool_file(jcr);
100    }
101    return true;
102 }
103
104 bool commit_data_spool(JCR *jcr)
105 {
106    bool stat;
107    if (jcr->dcr->spooling) {
108       Dmsg0(100, "Committing spooled data\n");
109       stat = despool_data(jcr->dcr);
110       if (!stat) {
111          Dmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
112          close_data_spool_file(jcr);
113          return false;
114       }
115       return close_data_spool_file(jcr);
116    }
117    return true;
118 }
119
120 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
121 {
122    char *dir;  
123    if (jcr->dcr->dev->device->spool_directory) {
124       dir = jcr->dcr->dev->device->spool_directory;
125    } else {
126       dir = working_directory;
127    }
128    Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, jcr->Job, jcr->device->hdr.name);
129 }
130
131
132 static bool open_data_spool_file(JCR *jcr)
133 {
134    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
135    int spool_fd;
136
137    make_unique_data_spool_filename(jcr, &name);
138    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
139       jcr->dcr->spool_fd = spool_fd;
140       jcr->spool_attributes = true;
141    } else {
142       Jmsg(jcr, M_ERROR, 0, _("Open data spool file %s failed: ERR=%s\n"), name, strerror(errno));
143       free_pool_memory(name);
144       return false;
145     }
146     Dmsg1(100, "Created spool file: %s\n", name);
147     free_pool_memory(name);
148     return true;
149 }
150
151 static bool close_data_spool_file(JCR *jcr)
152 {
153     POOLMEM *name  = get_pool_memory(PM_MESSAGE);
154
155     P(mutex);
156     spool_stats.data_jobs--;
157     spool_stats.total_data_jobs++;
158     spool_stats.data_size -= jcr->dcr->spool_size;
159     V(mutex);
160
161     make_unique_data_spool_filename(jcr, &name);
162     close(jcr->dcr->spool_fd);
163     jcr->dcr->spool_fd = -1;
164     jcr->dcr->spooling = false;
165     unlink(name);
166     Dmsg1(100, "Deleted spool file: %s\n", name);
167     free_pool_memory(name);
168     return true;
169 }
170
171 static bool despool_data(DCR *dcr)
172 {
173    DEVICE *rdev;
174    DCR *rdcr;
175    bool ok = true;
176    DEV_BLOCK *block;
177    JCR *jcr = dcr->jcr;
178    int stat;
179
180    Dmsg0(100, "Despooling data\n");
181    dcr->spooling = false;
182    lock_device(dcr->dev);
183    dcr->dev_locked = true; 
184    /* Set up a dev structure to read */
185    rdev = (DEVICE *)malloc(sizeof(DEVICE));
186    memset(rdev, 0, sizeof(DEVICE));
187    rdev->dev_name = get_memory(strlen("spool")+1);
188    strcpy(rdev->dev_name, "spool");
189    rdev->errmsg = get_pool_memory(PM_EMSG);
190    *rdev->errmsg = 0;
191    rdev->device = dcr->dev->device;
192    rdcr = new_dcr(NULL, rdev);
193    rdcr->spool_fd = dcr->spool_fd; 
194    rdcr->jcr = jcr;                   /* set a valid jcr */
195    block = rdcr->block;
196    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
197
198    for ( ; ok; ) {
199       if (job_canceled(jcr)) {
200          ok = false;
201          break;
202       }
203       stat = read_block_from_spool_file(rdcr, block);
204       if (stat == RB_EOT) {
205          break;
206       } else if (stat == RB_ERROR) {
207          ok = false;
208          break;
209       }
210       ok = write_block_to_device(dcr, block);
211       Dmsg3(100, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
212    }
213
214    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
215    if (ftruncate(rdcr->spool_fd, 0) != 0) {
216       Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file error. ERR=%s\n"), 
217          strerror(errno));
218       Dmsg1(000, "Bad return from ftruncate. ERR=%s\n", strerror(errno));
219       ok = false;
220    }
221
222    P(mutex);
223    spool_stats.data_size -= dcr->spool_size;
224    V(mutex);
225    P(dcr->dev->spool_mutex);
226    dcr->dev->spool_size -= dcr->spool_size;
227    dcr->spool_size = 0;               /* zap size in input dcr */
228    V(dcr->dev->spool_mutex);
229    free_memory(rdev->dev_name);
230    free_pool_memory(rdev->errmsg);
231    free(rdev);
232    rdcr->jcr = NULL;
233    free_dcr(rdcr);
234    unlock_device(dcr->dev);
235    dcr->dev_locked = false;
236    dcr->spooling = true;           /* turn on spooling again */
237    return ok;
238 }
239
240 /*
241  * Read a block from the spool file
242  * 
243  *  Returns RB_OK on success
244  *          RB_EOT when file done
245  *          RB_ERROR on error
246  */
247 static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block)
248 {
249    uint32_t rlen;
250    ssize_t stat;
251    spool_hdr hdr;
252
253    rlen = sizeof(hdr);
254    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
255    if (stat == 0) {
256       Dmsg0(100, "EOT on spool read.\n");
257       return RB_EOT;
258    } else if (stat != (ssize_t)rlen) {
259       if (stat == -1) {
260          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"), strerror(errno));
261       } else {
262          Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
263          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
264       }
265       return RB_ERROR;
266    }
267    rlen = hdr.len;
268    if (rlen > block->buf_len) {
269       Dmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
270       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
271       return RB_ERROR;
272    }
273    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
274    if (stat != (ssize_t)rlen) {
275       Dmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
276       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
277       return RB_ERROR;
278    }
279    /* Setup write pointers */
280    block->binbuf = rlen;
281    block->bufp = block->buf + block->binbuf;
282    block->FirstIndex = hdr.FirstIndex;
283    block->LastIndex = hdr.LastIndex;
284    block->VolSessionId = dcr->jcr->VolSessionId;
285    block->VolSessionTime = dcr->jcr->VolSessionTime;
286    Dmsg2(100, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
287    return RB_OK;
288 }
289
290 /*
291  * Write a block to the spool file
292  *
293  *  Returns: true on success or EOT
294  *           false on hard error
295  */
296 bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
297 {
298    ssize_t stat = 0;
299    uint32_t wlen, hlen;               /* length to write */
300    int retry = 0;
301    spool_hdr hdr;   
302    bool despool = false;
303
304    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
305    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
306       return true;
307    }
308
309    hlen = sizeof(hdr);
310    wlen = block->binbuf;
311    P(dcr->dev->spool_mutex);
312    dcr->spool_size += hlen + wlen;
313    dcr->dev->spool_size += hlen + wlen;
314    if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
315        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
316       despool = true;
317    }
318    V(dcr->dev->spool_mutex);
319    P(mutex);
320    spool_stats.data_size += hlen + wlen;
321    if (spool_stats.data_size > spool_stats.max_data_size) {
322       spool_stats.max_data_size = spool_stats.data_size;
323    }
324    V(mutex);
325    if (despool) {
326       char ec1[30], ec2[30], ec3[30], ec4[30];
327       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
328             "max_job_size=%s job_size=%s\n", 
329             edit_uint64_with_commas(dcr->max_spool_size, ec1),
330             edit_uint64_with_commas(dcr->spool_size, ec2),
331             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
332             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
333       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached. Despooling ...\n"));
334       if (!despool_data(dcr)) {
335          Dmsg0(000, "Bad return from despool in write_block.\n");
336          return false;
337       }
338       /* Despooling cleard these variables so reset them */
339       P(dcr->dev->spool_mutex);
340       dcr->spool_size += hlen + wlen;
341       dcr->dev->spool_size += hlen + wlen;
342       V(dcr->dev->spool_mutex);
343       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
344    }  
345
346    hdr.FirstIndex = block->FirstIndex;
347    hdr.LastIndex = block->LastIndex;
348    hdr.len = block->binbuf;
349
350    /* Write header */
351    for ( ;; ) {
352       stat = write(dcr->spool_fd, (char*)&hdr, (size_t)hlen);
353       if (stat == -1) {
354          Jmsg(dcr->jcr, M_INFO, 0, _("Error writing header to spool file. ERR=%s\n"), strerror(errno));
355       }
356       if (stat != (ssize_t)hlen) {
357          if (!despool_data(dcr)) {
358             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
359             return false;
360          }
361          if (retry++ > 1) {
362             return false;
363          }
364          continue;
365       }
366       break;
367    }
368
369    /* Write data */
370    for ( ;; ) {
371       stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
372       if (stat == -1) {
373          Jmsg(dcr->jcr, M_INFO, 0, _("Error writing data to spool file. ERR=%s\n"), strerror(errno));
374       }
375       if (stat != (ssize_t)wlen) {
376          if (!despool_data(dcr)) {
377             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
378             return false;
379          }
380          if (retry++ > 1) {
381             return false;
382          }
383          continue;
384       }
385       break;
386    }
387    Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
388
389    empty_block(block);
390    return true;
391 }
392
393
394 bool are_attributes_spooled(JCR *jcr)
395 {
396    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
397 }
398
399 /* 
400  * Create spool file for attributes.
401  *  This is done by "attaching" to the bsock, and when
402  *  it is called, the output is written to a file.
403  *  The actual spooling is turned on and off in
404  *  append.c only during writing of the attributes.
405  */
406 bool begin_attribute_spool(JCR *jcr)
407 {
408    if (!jcr->no_attributes && jcr->spool_attributes) {
409       return open_spool_file(jcr, jcr->dir_bsock);
410    }
411    return true;
412 }
413
414 bool discard_attribute_spool(JCR *jcr)
415 {
416    if (are_attributes_spooled(jcr)) {
417       return close_spool_file(jcr, jcr->dir_bsock);
418    }
419    return true;
420 }
421
422 bool commit_attribute_spool(JCR *jcr)
423 {
424    if (are_attributes_spooled(jcr)) {
425       bnet_despool_to_bsock(jcr->dir_bsock);
426       return close_spool_file(jcr, jcr->dir_bsock);
427    }
428    return true;
429 }
430
431 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
432 {
433    Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
434       jcr->Job, fd);
435 }
436
437 bool open_spool_file(JCR *jcr, BSOCK *bs)
438 {
439     POOLMEM *name  = get_pool_memory(PM_MESSAGE);
440
441     make_unique_spool_filename(jcr, &name, bs->fd);
442     bs->spool_fd = fopen(mp_chr(name), "w+");
443     if (!bs->spool_fd) {
444        Jmsg(jcr, M_ERROR, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name, strerror(errno));
445        free_pool_memory(name);
446        return false;
447     }
448     P(mutex);
449     spool_stats.attr_jobs++;
450     V(mutex);
451     free_pool_memory(name);
452     return true;
453 }
454
455 bool close_spool_file(JCR *jcr, BSOCK *bs)
456 {
457     POOLMEM *name  = get_pool_memory(PM_MESSAGE);
458     ssize_t size;
459      
460     fseek(bs->spool_fd, 0, SEEK_END);
461     size = ftell(bs->spool_fd);
462     P(mutex);
463     if (size > 0) {
464        if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
465           spool_stats.max_attr_size = spool_stats.attr_size + size;
466        } 
467     }
468     spool_stats.attr_jobs--;
469     spool_stats.total_attr_jobs++;
470     V(mutex);
471     make_unique_spool_filename(jcr, &name, bs->fd);
472     fclose(bs->spool_fd);
473     unlink(mp_chr(name));
474     free_pool_memory(name);
475     bs->spool_fd = NULL;
476     bs->spool = false;
477     return true;
478 }