4 * Kern Sibbald, March 2004
9 Copyright (C) 2000-2004 Kern Sibbald and John Walker
11 This program is free software; you can redistribute it and/or
12 modify it under the terms of the GNU General Public License as
13 published by the Free Software Foundation; either version 2 of
14 the License, or (at your option) any later version.
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public
22 License along with this program; if not, write to the Free
23 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
31 /* Forward referenced subroutines */
32 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
33 static bool open_data_spool_file(JCR *jcr);
34 static bool close_data_spool_file(JCR *jcr);
35 static bool despool_data(DCR *dcr);
36 static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
37 static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
38 static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
40 struct spool_stats_t {
41 uint32_t data_jobs; /* current jobs spooling data */
43 uint32_t total_data_jobs; /* total jobs to have spooled data */
44 uint32_t total_attr_jobs;
45 int64_t max_data_size; /* max data size */
46 int64_t max_attr_size;
47 int64_t data_size; /* current data size (all jobs running) */
51 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
52 spool_stats_t spool_stats;
55 * Header for data spool record */
57 int32_t FirstIndex; /* FirstIndex for buffer */
58 int32_t LastIndex; /* LastIndex for buffer */
59 uint32_t len; /* length of next buffer */
68 void list_spool_stats(BSOCK *bs)
70 char ed1[30], ed2[30];
71 if (spool_stats.data_jobs || spool_stats.max_data_size) {
72 bnet_fsend(bs, "Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n",
73 spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
74 spool_stats.total_data_jobs,
75 edit_uint64_with_commas(spool_stats.max_data_size, ed2));
77 if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
78 bnet_fsend(bs, "Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n",
79 spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
80 spool_stats.total_attr_jobs,
81 edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
85 bool begin_data_spool(JCR *jcr)
88 if (jcr->spool_data) {
89 Dmsg0(100, "Turning on data spooling\n");
90 jcr->dcr->spool_data = true;
91 stat = open_data_spool_file(jcr);
93 jcr->dcr->spooling = true;
94 Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
96 spool_stats.data_jobs++;
103 bool discard_data_spool(JCR *jcr)
105 if (jcr->dcr->spooling) {
106 Dmsg0(100, "Data spooling discarded\n");
107 return close_data_spool_file(jcr);
112 bool commit_data_spool(JCR *jcr)
117 if (jcr->dcr->spooling) {
118 Dmsg0(100, "Committing spooled data\n");
119 Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
120 edit_uint64_with_commas(jcr->dcr->dev->spool_size, ec1));
121 stat = despool_data(jcr->dcr);
123 Dmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
124 close_data_spool_file(jcr);
127 return close_data_spool_file(jcr);
132 static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
135 if (jcr->dcr->dev->device->spool_directory) {
136 dir = jcr->dcr->dev->device->spool_directory;
138 dir = working_directory;
140 Mmsg(name, "%s/%s.data.spool.%s.%s", dir, my_name, jcr->Job, jcr->device->hdr.name);
144 static bool open_data_spool_file(JCR *jcr)
146 POOLMEM *name = get_pool_memory(PM_MESSAGE);
149 make_unique_data_spool_filename(jcr, &name);
150 if ((spool_fd = open(name, O_CREAT|O_TRUNC|O_RDWR|O_BINARY, 0640)) >= 0) {
151 jcr->dcr->spool_fd = spool_fd;
152 jcr->spool_attributes = true;
154 Jmsg(jcr, M_ERROR, 0, _("Open data spool file %s failed: ERR=%s\n"), name, strerror(errno));
155 free_pool_memory(name);
158 Dmsg1(100, "Created spool file: %s\n", name);
159 free_pool_memory(name);
163 static bool close_data_spool_file(JCR *jcr)
165 POOLMEM *name = get_pool_memory(PM_MESSAGE);
168 spool_stats.data_jobs--;
169 spool_stats.total_data_jobs++;
170 if (spool_stats.data_size < jcr->dcr->spool_size) {
171 spool_stats.data_size = 0;
173 spool_stats.data_size -= jcr->dcr->spool_size;
175 jcr->dcr->spool_size = 0;
178 make_unique_data_spool_filename(jcr, &name);
179 close(jcr->dcr->spool_fd);
180 jcr->dcr->spool_fd = -1;
181 jcr->dcr->spooling = false;
183 Dmsg1(100, "Deleted spool file: %s\n", name);
184 free_pool_memory(name);
188 static bool despool_data(DCR *dcr)
197 Dmsg0(100, "Despooling data\n");
198 dcr->spooling = false;
199 lock_device(dcr->dev);
200 dcr->dev_locked = true;
202 /* Setup a dev structure to read */
203 rdev = (DEVICE *)malloc(sizeof(DEVICE));
204 memset(rdev, 0, sizeof(DEVICE));
205 rdev->dev_name = get_memory(strlen("spool")+1);
206 strcpy(rdev->dev_name, "spool");
207 rdev->errmsg = get_pool_memory(PM_EMSG);
209 rdev->max_block_size = dcr->dev->max_block_size;
210 rdev->min_block_size = dcr->dev->min_block_size;
211 rdev->device = dcr->dev->device;
212 rdcr = new_dcr(NULL, rdev);
213 rdcr->spool_fd = dcr->spool_fd;
214 rdcr->jcr = jcr; /* set a valid jcr */
216 Dmsg1(800, "read/write block size = %d\n", block->buf_len);
217 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
220 if (job_canceled(jcr)) {
224 stat = read_block_from_spool_file(rdcr, block);
225 if (stat == RB_EOT) {
227 } else if (stat == RB_ERROR) {
231 ok = write_block_to_device(dcr, block);
232 Dmsg3(100, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
235 lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
236 if (ftruncate(rdcr->spool_fd, 0) != 0) {
237 Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file error. ERR=%s\n"),
239 Dmsg1(000, "Bad return from ftruncate. ERR=%s\n", strerror(errno));
244 if (spool_stats.data_size < dcr->spool_size) {
245 spool_stats.data_size = 0;
247 spool_stats.data_size -= dcr->spool_size;
250 P(dcr->dev->spool_mutex);
251 dcr->dev->spool_size -= dcr->spool_size;
252 dcr->spool_size = 0; /* zap size in input dcr */
253 V(dcr->dev->spool_mutex);
254 free_memory(rdev->dev_name);
255 free_pool_memory(rdev->errmsg);
259 unlock_device(dcr->dev);
260 dcr->dev_locked = false;
261 dcr->spooling = true; /* turn on spooling again */
266 * Read a block from the spool file
268 * Returns RB_OK on success
269 * RB_EOT when file done
272 static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block)
279 stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
281 Dmsg0(100, "EOT on spool read.\n");
283 } else if (stat != (ssize_t)rlen) {
285 Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"), strerror(errno));
287 Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
288 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
293 if (rlen > block->buf_len) {
294 Dmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
295 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
298 stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
299 if (stat != (ssize_t)rlen) {
300 Dmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
301 Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
304 /* Setup write pointers */
305 block->binbuf = rlen;
306 block->bufp = block->buf + block->binbuf;
307 block->FirstIndex = hdr.FirstIndex;
308 block->LastIndex = hdr.LastIndex;
309 block->VolSessionId = dcr->jcr->VolSessionId;
310 block->VolSessionTime = dcr->jcr->VolSessionTime;
311 Dmsg2(100, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
316 * Write a block to the spool file
318 * Returns: true on success or EOT
319 * false on hard error
321 bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
324 uint32_t wlen, hlen; /* length to write */
327 bool despool = false;
329 ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
330 if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
335 wlen = block->binbuf;
336 P(dcr->dev->spool_mutex);
337 dcr->spool_size += hlen + wlen;
338 dcr->dev->spool_size += hlen + wlen;
339 if ((dcr->max_spool_size > 0 && dcr->spool_size >= dcr->max_spool_size) ||
340 (dcr->dev->max_spool_size > 0 && dcr->dev->spool_size >= dcr->dev->max_spool_size)) {
343 V(dcr->dev->spool_mutex);
345 spool_stats.data_size += hlen + wlen;
346 if (spool_stats.data_size > spool_stats.max_data_size) {
347 spool_stats.max_data_size = spool_stats.data_size;
353 char ec2[30], ec3[30], ec4[30];
354 Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
355 "max_job_size=%s job_size=%s\n",
356 edit_uint64_with_commas(dcr->max_spool_size, ec1),
357 edit_uint64_with_commas(dcr->spool_size, ec2),
358 edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
359 edit_uint64_with_commas(dcr->dev->spool_size, ec4));
361 Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached. Despooling %s bytes ...\n"),
362 edit_uint64_with_commas(dcr->dev->spool_size, ec1));
363 if (!despool_data(dcr)) {
364 Dmsg0(000, "Bad return from despool in write_block.\n");
367 /* Despooling cleared these variables so reset them */
368 P(dcr->dev->spool_mutex);
369 dcr->spool_size += hlen + wlen;
370 dcr->dev->spool_size += hlen + wlen;
371 V(dcr->dev->spool_mutex);
372 Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
375 hdr.FirstIndex = block->FirstIndex;
376 hdr.LastIndex = block->LastIndex;
377 hdr.len = block->binbuf;
381 stat = write(dcr->spool_fd, (char*)&hdr, (size_t)hlen);
383 Jmsg(dcr->jcr, M_INFO, 0, _("Error writing header to spool file. ERR=%s\n"), strerror(errno));
385 if (stat != (ssize_t)hlen) {
386 if (!despool_data(dcr)) {
387 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
400 stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
402 Jmsg(dcr->jcr, M_INFO, 0, _("Error writing data to spool file. ERR=%s\n"), strerror(errno));
404 if (stat != (ssize_t)wlen) {
405 if (!despool_data(dcr)) {
406 Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
416 Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
423 bool are_attributes_spooled(JCR *jcr)
425 return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
429 * Create spool file for attributes.
430 * This is done by "attaching" to the bsock, and when
431 * it is called, the output is written to a file.
432 * The actual spooling is turned on and off in
433 * append.c only during writing of the attributes.
435 bool begin_attribute_spool(JCR *jcr)
437 if (!jcr->no_attributes && jcr->spool_attributes) {
438 return open_attr_spool_file(jcr, jcr->dir_bsock);
443 bool discard_attribute_spool(JCR *jcr)
445 if (are_attributes_spooled(jcr)) {
446 return close_attr_spool_file(jcr, jcr->dir_bsock);
451 static void update_attr_spool_size(ssize_t size)
455 if ((spool_stats.attr_size - size) > 0) {
456 spool_stats.attr_size -= size;
458 spool_stats.attr_size = 0;
464 bool commit_attribute_spool(JCR *jcr)
469 if (are_attributes_spooled(jcr)) {
470 fseek(jcr->dir_bsock->spool_fd, 0, SEEK_END);
471 size = ftell(jcr->dir_bsock->spool_fd);
474 if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
475 spool_stats.max_attr_size = spool_stats.attr_size + size;
478 spool_stats.attr_size += size;
480 Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to DIR. Despooling %s bytes ...\n"),
481 edit_uint64_with_commas(size, ec1));
482 bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
483 return close_attr_spool_file(jcr, jcr->dir_bsock);
488 static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
490 Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
495 bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
497 POOLMEM *name = get_pool_memory(PM_MESSAGE);
499 make_unique_spool_filename(jcr, &name, bs->fd);
500 bs->spool_fd = fopen(mp_chr(name), "w+");
502 Jmsg(jcr, M_ERROR, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name, strerror(errno));
503 free_pool_memory(name);
507 spool_stats.attr_jobs++;
509 free_pool_memory(name);
513 bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
520 name = get_pool_memory(PM_MESSAGE);
522 spool_stats.attr_jobs--;
523 spool_stats.total_attr_jobs++;
525 make_unique_spool_filename(jcr, &name, bs->fd);
526 fclose(bs->spool_fd);
527 unlink(mp_chr(name));
528 free_pool_memory(name);