]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/spool.c
added WSACleanup(), corrected WSA_Init() (removed #ifdef)
[bacula/bacula] / bacula / src / stored / spool.c
1 /*
2  *  Spooling code 
3  *
4  *      Kern Sibbald, March 2004
5  *
6  *  Version $Id$
7  */
8 /*
9    Copyright (C) 2000-2004 Kern Sibbald and John Walker
10
11    This program is free software; you can redistribute it and/or
12    modify it under the terms of the GNU General Public License as
13    published by the Free Software Foundation; either version 2 of
14    the License, or (at your option) any later version.
15
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19    General Public License for more details.
20
21    You should have received a copy of the GNU General Public
22    License along with this program; if not, write to the Free
23    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
24    MA 02111-1307, USA.
25
26  */
27
28 #include "bacula.h"
29 #include "stored.h"
30
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
33 static bool open_data_spool_file(JCR *jcr);
34 static bool close_data_spool_file(JCR *jcr);
35 static bool despool_data(DCR *dcr);
36 static int  read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
37
38 struct spool_stats_t {
39    uint32_t data_jobs;                /* current jobs spooling data */
40    uint32_t attr_jobs;                
41    uint32_t total_data_jobs;          /* total jobs to have spooled data */
42    uint32_t total_attr_jobs;
43    uint64_t max_data_size;            /* max data size */
44    uint64_t max_attr_size;
45    uint64_t data_size;                /* current data size (all jobs running) */
46    uint64_t attr_size;
47 };
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50 spool_stats_t spool_stats;
51
52 /* 
53  * Header for data spool record */
54 struct spool_hdr {
55    int32_t  FirstIndex;               /* FirstIndex for buffer */
56    int32_t  LastIndex;                /* LastIndex for buffer */
57    uint32_t len;                      /* length of next buffer */
58 };
59
60 enum {
61    RB_EOT = 1,
62    RB_ERROR,
63    RB_OK
64 };
65
66 void list_spool_stats(BSOCK *bs)
67 {
68    char ed1[30], ed2[30];
69    bnet_fsend(bs, "Data spooling: %d active jobs %s bytes; %d total jobs %s max bytes/job.\n",
70       spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
71       spool_stats.total_data_jobs, 
72       edit_uint64_with_commas(spool_stats.max_data_size, ed2));
73    bnet_fsend(bs, "Attr spooling: %d active jobs; %d total jobs %s max bytes/job.\n",
74       spool_stats.attr_jobs, spool_stats.total_attr_jobs, 
75       edit_uint64_with_commas(spool_stats.max_attr_size, ed1));
76 }
77
78 bool begin_data_spool(JCR *jcr)
79 {
80    bool stat = true;
81    if (jcr->spool_data) {
82       Dmsg0(100, "Turning on data spooling\n");
83       jcr->dcr->spool_data = true;
84       stat = open_data_spool_file(jcr);
85       if (stat) {
86          jcr->dcr->spooling = true;
87          Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
88          P(mutex);
89          spool_stats.data_jobs++;
90          V(mutex);
91       }
92    }
93    return stat;
94 }
95
96 bool discard_data_spool(JCR *jcr)
97 {
98    if (jcr->dcr->spooling) {
99       Dmsg0(100, "Data spooling discarded\n");
100       return close_data_spool_file(jcr);
101    }
102    return true;
103 }
104
105 bool commit_data_spool(JCR *jcr)
106 {
107    bool stat;
108    if (jcr->dcr->spooling) {
109       Dmsg0(100, "Committing spooled data\n");
110       stat = despool_data(jcr->dcr);
111       if (!stat) {
112          Dmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
113          close_data_spool_file(jcr);
114          return false;
115       }
116       return close_data_spool_file(jcr);
117    }
118    return true;
119 }
120
121 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
122 {
123    char *dir;  
124    if (jcr->dcr->dev->device->spool_directory) {
125       dir = jcr->dcr->dev->device->spool_directory;
126    } else {
127       dir = working_directory;
128    }
129    Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, jcr->Job, jcr->device->hdr.name);
130 }
131
132
133 static bool open_data_spool_file(JCR *jcr)
134 {
135    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
136    int spool_fd;
137
138    make_unique_data_spool_filename(jcr, &name);
139    if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
140       jcr->dcr->spool_fd = spool_fd;
141       jcr->spool_attributes = true;
142    } else {
143       Jmsg(jcr, M_ERROR, 0, _("Open data spool file %s failed: ERR=%s\n"), name, strerror(errno));
144       free_pool_memory(name);
145       return false;
146    }
147    Dmsg1(100, "Created spool file: %s\n", name);
148    free_pool_memory(name);
149    return true;
150 }
151
152 static bool close_data_spool_file(JCR *jcr)
153 {
154    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
155
156    P(mutex);
157    spool_stats.data_jobs--;
158    spool_stats.total_data_jobs++;
159    spool_stats.data_size -= jcr->dcr->spool_size;
160    if (spool_stats.data_size < 0) {
161       spool_stats.data_size = 0;
162    }
163    jcr->dcr->spool_size = 0;
164    V(mutex);
165
166    make_unique_data_spool_filename(jcr, &name);
167    close(jcr->dcr->spool_fd);
168    jcr->dcr->spool_fd = -1;
169    jcr->dcr->spooling = false;
170    unlink(name);
171    Dmsg1(100, "Deleted spool file: %s\n", name);
172    free_pool_memory(name);
173    return true;
174 }
175
176 static bool despool_data(DCR *dcr)
177 {
178    DEVICE *rdev;
179    DCR *rdcr;
180    bool ok = true;
181    DEV_BLOCK *block;
182    JCR *jcr = dcr->jcr;
183    int stat;
184
185    Dmsg0(100, "Despooling data\n");
186    dcr->spooling = false;
187    lock_device(dcr->dev);
188    dcr->dev_locked = true; 
189
190    /* Setup a dev structure to read */
191    rdev = (DEVICE *)malloc(sizeof(DEVICE));
192    memset(rdev, 0, sizeof(DEVICE));
193    rdev->dev_name = get_memory(strlen("spool")+1);
194    strcpy(rdev->dev_name, "spool");
195    rdev->errmsg = get_pool_memory(PM_EMSG);
196    *rdev->errmsg = 0;
197    rdev->max_block_size = dcr->dev->max_block_size;
198    rdev->min_block_size = dcr->dev->min_block_size;
199    rdev->device = dcr->dev->device;
200    rdcr = new_dcr(NULL, rdev);
201    rdcr->spool_fd = dcr->spool_fd; 
202    rdcr->jcr = jcr;                   /* set a valid jcr */
203    block = rdcr->block;
204    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
205    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
206
207    for ( ; ok; ) {
208       if (job_canceled(jcr)) {
209          ok = false;
210          break;
211       }
212       stat = read_block_from_spool_file(rdcr, block);
213       if (stat == RB_EOT) {
214          break;
215       } else if (stat == RB_ERROR) {
216          ok = false;
217          break;
218       }
219       ok = write_block_to_device(dcr, block);
220       Dmsg3(100, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
221    }
222
223    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
224    if (ftruncate(rdcr->spool_fd, 0) != 0) {
225       Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file error. ERR=%s\n"), 
226          strerror(errno));
227       Dmsg1(000, "Bad return from ftruncate. ERR=%s\n", strerror(errno));
228       ok = false;
229    }
230
231    P(mutex);
232    spool_stats.data_size -= dcr->spool_size;
233    if (spool_stats.data_size < 0) {
234       spool_stats.data_size = 0;
235    }
236    V(mutex);
237    P(dcr->dev->spool_mutex);
238    dcr->dev->spool_size -= dcr->spool_size;
239    dcr->spool_size = 0;               /* zap size in input dcr */
240    V(dcr->dev->spool_mutex);
241    free_memory(rdev->dev_name);
242    free_pool_memory(rdev->errmsg);
243    free(rdev);
244    rdcr->jcr = NULL;
245    free_dcr(rdcr);
246    unlock_device(dcr->dev);
247    dcr->dev_locked = false;
248    dcr->spooling = true;           /* turn on spooling again */
249    return ok;
250 }
251
252 /*
253  * Read a block from the spool file
254  * 
255  *  Returns RB_OK on success
256  *          RB_EOT when file done
257  *          RB_ERROR on error
258  */
259 static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block)
260 {
261    uint32_t rlen;
262    ssize_t stat;
263    spool_hdr hdr;
264
265    rlen = sizeof(hdr);
266    stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
267    if (stat == 0) {
268       Dmsg0(100, "EOT on spool read.\n");
269       return RB_EOT;
270    } else if (stat != (ssize_t)rlen) {
271       if (stat == -1) {
272          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"), strerror(errno));
273       } else {
274          Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
275          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
276       }
277       return RB_ERROR;
278    }
279    rlen = hdr.len;
280    if (rlen > block->buf_len) {
281       Dmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
282       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
283       return RB_ERROR;
284    }
285    stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
286    if (stat != (ssize_t)rlen) {
287       Dmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
288       Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
289       return RB_ERROR;
290    }
291    /* Setup write pointers */
292    block->binbuf = rlen;
293    block->bufp = block->buf + block->binbuf;
294    block->FirstIndex = hdr.FirstIndex;
295    block->LastIndex = hdr.LastIndex;
296    block->VolSessionId = dcr->jcr->VolSessionId;
297    block->VolSessionTime = dcr->jcr->VolSessionTime;
298    Dmsg2(100, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
299    return RB_OK;
300 }
301
302 /*
303  * Write a block to the spool file
304  *
305  *  Returns: true on success or EOT
306  *           false on hard error
307  */
308 bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
309 {
310    ssize_t stat = 0;
311    uint32_t wlen, hlen;               /* length to write */
312    int retry = 0;
313    spool_hdr hdr;   
314    bool despool = false;
315
316    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
317    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
318       return true;
319    }
320
321    hlen = sizeof(hdr);
322    wlen = block->binbuf;
323    P(dcr->dev->spool_mutex);
324    dcr->spool_size += hlen + wlen;
325    dcr->dev->spool_size += hlen + wlen;
326    if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
327        (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
328       despool = true;
329    }
330    V(dcr->dev->spool_mutex);
331    P(mutex);
332    spool_stats.data_size += hlen + wlen;
333    if (spool_stats.data_size > spool_stats.max_data_size) {
334       spool_stats.max_data_size = spool_stats.data_size;
335    }
336    V(mutex);
337    if (despool) {
338 #ifdef xDEBUG 
339       char ec1[30], ec2[30], ec3[30], ec4[30];
340       Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
341             "max_job_size=%s job_size=%s\n", 
342             edit_uint64_with_commas(dcr->max_spool_size, ec1),
343             edit_uint64_with_commas(dcr->spool_size, ec2),
344             edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
345             edit_uint64_with_commas(dcr->dev->spool_size, ec4));
346 #endif
347       Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached. Despooling ...\n"));
348       if (!despool_data(dcr)) {
349          Dmsg0(000, "Bad return from despool in write_block.\n");
350          return false;
351       }
352       /* Despooling cleard these variables so reset them */
353       P(dcr->dev->spool_mutex);
354       dcr->spool_size += hlen + wlen;
355       dcr->dev->spool_size += hlen + wlen;
356       V(dcr->dev->spool_mutex);
357       Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
358    }  
359
360    hdr.FirstIndex = block->FirstIndex;
361    hdr.LastIndex = block->LastIndex;
362    hdr.len = block->binbuf;
363
364    /* Write header */
365    for ( ;; ) {
366       stat = write(dcr->spool_fd, (char*)&hdr, (size_t)hlen);
367       if (stat == -1) {
368          Jmsg(dcr->jcr, M_INFO, 0, _("Error writing header to spool file. ERR=%s\n"), strerror(errno));
369       }
370       if (stat != (ssize_t)hlen) {
371          if (!despool_data(dcr)) {
372             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
373             return false;
374          }
375          if (retry++ > 1) {
376             return false;
377          }
378          continue;
379       }
380       break;
381    }
382
383    /* Write data */
384    for ( ;; ) {
385       stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
386       if (stat == -1) {
387          Jmsg(dcr->jcr, M_INFO, 0, _("Error writing data to spool file. ERR=%s\n"), strerror(errno));
388       }
389       if (stat != (ssize_t)wlen) {
390          if (!despool_data(dcr)) {
391             Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
392             return false;
393          }
394          if (retry++ > 1) {
395             return false;
396          }
397          continue;
398       }
399       break;
400    }
401    Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
402
403    empty_block(block);
404    return true;
405 }
406
407
408 bool are_attributes_spooled(JCR *jcr)
409 {
410    return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
411 }
412
413 /* 
414  * Create spool file for attributes.
415  *  This is done by "attaching" to the bsock, and when
416  *  it is called, the output is written to a file.
417  *  The actual spooling is turned on and off in
418  *  append.c only during writing of the attributes.
419  */
420 bool begin_attribute_spool(JCR *jcr)
421 {
422    if (!jcr->no_attributes && jcr->spool_attributes) {
423       return open_spool_file(jcr, jcr->dir_bsock);
424    }
425    return true;
426 }
427
428 bool discard_attribute_spool(JCR *jcr)
429 {
430    if (are_attributes_spooled(jcr)) {
431       return close_spool_file(jcr, jcr->dir_bsock);
432    }
433    return true;
434 }
435
436 bool commit_attribute_spool(JCR *jcr)
437 {
438    if (are_attributes_spooled(jcr)) {
439       bnet_despool_to_bsock(jcr->dir_bsock);
440       return close_spool_file(jcr, jcr->dir_bsock);
441    }
442    return true;
443 }
444
445 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
446 {
447    Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
448       jcr->Job, fd);
449 }
450
451 bool open_spool_file(JCR *jcr, BSOCK *bs)
452 {
453     POOLMEM *name  = get_pool_memory(PM_MESSAGE);
454
455     make_unique_spool_filename(jcr, &name, bs->fd);
456     bs->spool_fd = fopen(mp_chr(name), "w+");
457     if (!bs->spool_fd) {
458        Jmsg(jcr, M_ERROR, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name, strerror(errno));
459        free_pool_memory(name);
460        return false;
461     }
462     P(mutex);
463     spool_stats.attr_jobs++;
464     V(mutex);
465     free_pool_memory(name);
466     return true;
467 }
468
469 bool close_spool_file(JCR *jcr, BSOCK *bs)
470 {
471     POOLMEM *name  = get_pool_memory(PM_MESSAGE);
472     ssize_t size;
473      
474     fseek(bs->spool_fd, 0, SEEK_END);
475     size = ftell(bs->spool_fd);
476     P(mutex);
477     if (size > 0) {
478        if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
479           spool_stats.max_attr_size = spool_stats.attr_size + size;
480        } 
481     }
482     spool_stats.attr_jobs--;
483     spool_stats.total_attr_jobs++;
484     V(mutex);
485     make_unique_spool_filename(jcr, &name, bs->fd);
486     fclose(bs->spool_fd);
487     unlink(mp_chr(name));
488     free_pool_memory(name);
489     bs->spool_fd = NULL;
490     bs->spool = false;
491     return true;
492 }