4 * Kern Sibbald, March 2004
9 Copyright (C) 2000-2004 Kern Sibbald and John Walker
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License as
13 published by the Free Software Foundation; either version 2 of
14 the License, or (at your option) any later version.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public
22 License along with this program; if not, write to the Free
23 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
33 static bool open_data_spool_file(JCR *jcr);
34 static bool close_data_spool_file(JCR *jcr);
35 static bool despool_data(DCR *dcr);
36 static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
38 struct spool_stats_t {
39 uint32_t data_jobs; /* current jobs spooling data */
41 uint32_t total_data_jobs; /* total jobs to have spooled data */
42 uint32_t total_attr_jobs;
43 uint64_t max_data_size; /* max data size */
44 uint64_t max_attr_size;
45 uint64_t data_size; /* current data size (all jobs running) */
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50 spool_stats_t spool_stats;
53 * Header for data spool record */
55 int32_t FirstIndex; /* FirstIndex for buffer */
56 int32_t LastIndex; /* LastIndex for buffer */
57 uint32_t len; /* length of next buffer */
66 void list_spool_stats(BSOCK *bs)
68 char ed1[30], ed2[30];
69 if (spool_stats.data_jobs || spool_stats.max_data_size) {
70 bnet_fsend(bs, "Data spooling: %d active jobs, %s bytes; %d total jobs, %s max bytes/job.\n",
71 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
72 spool_stats.total_data_jobs,
73 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
75 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
76 bnet_fsend(bs, "Attr spooling: %d active jobs; %d total jobs, %s max bytes/job.\n",
77 spool_stats.attr_jobs, spool_stats.total_attr_jobs,
78 edit_uint64_with_commas(spool_stats.max_attr_size, ed1));
82 bool begin_data_spool(JCR *jcr)
85 if (jcr->spool_data) {
86 Dmsg0(100, "Turning on data spooling\n");
87 jcr->dcr->spool_data = true;
88 stat = open_data_spool_file(jcr);
90 jcr->dcr->spooling = true;
91 Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
93 spool_stats.data_jobs++;
100 bool discard_data_spool(JCR *jcr)
102 if (jcr->dcr->spooling) {
103 Dmsg0(100, "Data spooling discarded\n");
104 return close_data_spool_file(jcr);
109 bool commit_data_spool(JCR *jcr)
114 if (jcr->dcr->spooling) {
115 Dmsg0(100, "Committing spooled data\n");
116 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
117 edit_uint64_with_commas(jcr->dcr->dev->spool_size, ec1));
118 stat = despool_data(jcr->dcr);
120 Dmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
121 close_data_spool_file(jcr);
124 return close_data_spool_file(jcr);
129 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
132 if (jcr->dcr->dev->device->spool_directory) {
133 dir = jcr->dcr->dev->device->spool_directory;
135 dir = working_directory;
137 Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, jcr->Job, jcr->device->hdr.name);
141 static bool open_data_spool_file(JCR *jcr)
143 POOLMEM *name = get_pool_memory(PM_MESSAGE);
146 make_unique_data_spool_filename(jcr, &name);
147 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
148 jcr->dcr->spool_fd = spool_fd;
149 jcr->spool_attributes = true;
151 Jmsg(jcr, M_ERROR, 0, _("Open data spool file %s failed: ERR=%s\n"), name, strerror(errno));
152 free_pool_memory(name);
155 Dmsg1(100, "Created spool file: %s\n", name);
156 free_pool_memory(name);
160 static bool close_data_spool_file(JCR *jcr)
162 POOLMEM *name = get_pool_memory(PM_MESSAGE);
165 spool_stats.data_jobs--;
166 spool_stats.total_data_jobs++;
167 spool_stats.data_size -= jcr->dcr->spool_size;
168 if (spool_stats.data_size < 0) {
169 spool_stats.data_size = 0;
171 jcr->dcr->spool_size = 0;
174 make_unique_data_spool_filename(jcr, &name);
175 close(jcr->dcr->spool_fd);
176 jcr->dcr->spool_fd = -1;
177 jcr->dcr->spooling = false;
179 Dmsg1(100, "Deleted spool file: %s\n", name);
180 free_pool_memory(name);
184 static bool despool_data(DCR *dcr)
193 Dmsg0(100, "Despooling data\n");
194 dcr->spooling = false;
195 lock_device(dcr->dev);
196 dcr->dev_locked = true;
198 /* Setup a dev structure to read */
199 rdev = (DEVICE *)malloc(sizeof(DEVICE));
200 memset(rdev, 0, sizeof(DEVICE));
201 rdev->dev_name = get_memory(strlen("spool")+1);
202 strcpy(rdev->dev_name, "spool");
203 rdev->errmsg = get_pool_memory(PM_EMSG);
205 rdev->max_block_size = dcr->dev->max_block_size;
206 rdev->min_block_size = dcr->dev->min_block_size;
207 rdev->device = dcr->dev->device;
208 rdcr = new_dcr(NULL, rdev);
209 rdcr->spool_fd = dcr->spool_fd;
210 rdcr->jcr = jcr; /* set a valid jcr */
212 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
213 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
216 if (job_canceled(jcr)) {
220 stat = read_block_from_spool_file(rdcr, block);
221 if (stat == RB_EOT) {
223 } else if (stat == RB_ERROR) {
227 ok = write_block_to_device(dcr, block);
228 Dmsg3(100, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
231 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
232 if (ftruncate(rdcr->spool_fd, 0) != 0) {
233 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file error. ERR=%s\n"),
235 Dmsg1(000, "Bad return from ftruncate. ERR=%s\n", strerror(errno));
240 spool_stats.data_size -= dcr->spool_size;
241 if (spool_stats.data_size < 0) {
242 spool_stats.data_size = 0;
245 P(dcr->dev->spool_mutex);
246 dcr->dev->spool_size -= dcr->spool_size;
247 dcr->spool_size = 0; /* zap size in input dcr */
248 V(dcr->dev->spool_mutex);
249 free_memory(rdev->dev_name);
250 free_pool_memory(rdev->errmsg);
254 unlock_device(dcr->dev);
255 dcr->dev_locked = false;
256 dcr->spooling = true; /* turn on spooling again */
261 * Read a block from the spool file
263 * Returns RB_OK on success
264 * RB_EOT when file done
267 static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block)
274 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
276 Dmsg0(100, "EOT on spool read.\n");
278 } else if (stat != (ssize_t)rlen) {
280 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"), strerror(errno));
282 Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
283 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
288 if (rlen > block->buf_len) {
289 Dmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
290 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
293 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
294 if (stat != (ssize_t)rlen) {
295 Dmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
296 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
299 /* Setup write pointers */
300 block->binbuf = rlen;
301 block->bufp = block->buf + block->binbuf;
302 block->FirstIndex = hdr.FirstIndex;
303 block->LastIndex = hdr.LastIndex;
304 block->VolSessionId = dcr->jcr->VolSessionId;
305 block->VolSessionTime = dcr->jcr->VolSessionTime;
306 Dmsg2(100, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
311 * Write a block to the spool file
313 * Returns: true on success or EOT
314 * false on hard error
316 bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
319 uint32_t wlen, hlen; /* length to write */
322 bool despool = false;
324 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
325 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
330 wlen = block->binbuf;
331 P(dcr->dev->spool_mutex);
332 dcr->spool_size += hlen + wlen;
333 dcr->dev->spool_size += hlen + wlen;
334 if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
335 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
338 V(dcr->dev->spool_mutex);
340 spool_stats.data_size += hlen + wlen;
341 if (spool_stats.data_size > spool_stats.max_data_size) {
342 spool_stats.max_data_size = spool_stats.data_size;
348 char ec2[30], ec3[30], ec4[30];
349 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
350 "max_job_size=%s job_size=%s\n",
351 edit_uint64_with_commas(dcr->max_spool_size, ec1),
352 edit_uint64_with_commas(dcr->spool_size, ec2),
353 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
354 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
356 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached. Despooling %s bytes ...\n"),
357 edit_uint64_with_commas(dcr->dev->spool_size, ec1));
358 if (!despool_data(dcr)) {
359 Dmsg0(000, "Bad return from despool in write_block.\n");
362 /* Despooling cleared these variables so reset them */
363 P(dcr->dev->spool_mutex);
364 dcr->spool_size += hlen + wlen;
365 dcr->dev->spool_size += hlen + wlen;
366 V(dcr->dev->spool_mutex);
367 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
370 hdr.FirstIndex = block->FirstIndex;
371 hdr.LastIndex = block->LastIndex;
372 hdr.len = block->binbuf;
376 stat = write(dcr->spool_fd, (char*)&hdr, (size_t)hlen);
378 Jmsg(dcr->jcr, M_INFO, 0, _("Error writing header to spool file. ERR=%s\n"), strerror(errno));
380 if (stat != (ssize_t)hlen) {
381 if (!despool_data(dcr)) {
382 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
395 stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
397 Jmsg(dcr->jcr, M_INFO, 0, _("Error writing data to spool file. ERR=%s\n"), strerror(errno));
399 if (stat != (ssize_t)wlen) {
400 if (!despool_data(dcr)) {
401 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
411 Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
418 bool are_attributes_spooled(JCR *jcr)
420 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
424 * Create spool file for attributes.
425 * This is done by "attaching" to the bsock, and when
426 * it is called, the output is written to a file.
427 * The actual spooling is turned on and off in
428 * append.c only during writing of the attributes.
430 bool begin_attribute_spool(JCR *jcr)
432 if (!jcr->no_attributes && jcr->spool_attributes) {
433 return open_spool_file(jcr, jcr->dir_bsock);
438 bool discard_attribute_spool(JCR *jcr)
440 if (are_attributes_spooled(jcr)) {
441 return close_spool_file(jcr, jcr->dir_bsock);
446 bool commit_attribute_spool(JCR *jcr)
448 if (are_attributes_spooled(jcr)) {
449 bnet_despool_to_bsock(jcr->dir_bsock);
450 return close_spool_file(jcr, jcr->dir_bsock);
455 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
457 Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
461 bool open_spool_file(JCR *jcr, BSOCK *bs)
463 POOLMEM *name = get_pool_memory(PM_MESSAGE);
465 make_unique_spool_filename(jcr, &name, bs->fd);
466 bs->spool_fd = fopen(mp_chr(name), "w+");
468 Jmsg(jcr, M_ERROR, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name, strerror(errno));
469 free_pool_memory(name);
473 spool_stats.attr_jobs++;
475 free_pool_memory(name);
479 bool close_spool_file(JCR *jcr, BSOCK *bs)
481 POOLMEM *name = get_pool_memory(PM_MESSAGE);
484 fseek(bs->spool_fd, 0, SEEK_END);
485 size = ftell(bs->spool_fd);
488 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
489 spool_stats.max_attr_size = spool_stats.attr_size + size;
492 spool_stats.attr_jobs--;
493 spool_stats.total_attr_jobs++;
495 make_unique_spool_filename(jcr, &name, bs->fd);
496 fclose(bs->spool_fd);
497 unlink(mp_chr(name));
498 free_pool_memory(name);