/* Forward referenced subroutines */
static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
-static int open_data_spool_file(JCR *jcr);
-static int close_data_spool_file(JCR *jcr);
+static bool open_data_spool_file(JCR *jcr);
+static bool close_data_spool_file(JCR *jcr);
static bool despool_data(DCR *dcr);
-static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
+static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
+
+struct spool_stats_t {
+ uint32_t data_jobs; /* current jobs spooling data */
+ uint32_t attr_jobs;
+ uint32_t total_data_jobs; /* total jobs to have spooled data */
+ uint32_t total_attr_jobs;
+ uint64_t max_data_size; /* max data size */
+ uint64_t max_attr_size;
+ uint64_t data_size; /* current data size (all jobs running) */
+ uint64_t attr_size;
+};
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+spool_stats_t spool_stats;
+/*
+ * Header for data spool record */
struct spool_hdr {
- int32_t FirstIndex;
- int32_t LastIndex;
- uint32_t len;
+ int32_t FirstIndex; /* FirstIndex for buffer */
+ int32_t LastIndex; /* LastIndex for buffer */
+ uint32_t len; /* length of next buffer */
};
enum {
RB_OK
};
+void list_spool_stats(BSOCK *bs)
+{
+ char ed1[30], ed2[30];
+ bnet_fsend(bs, "Data spooling: %d active jobs %s bytes; %d total jobs %s max bytes/job.\n",
+ spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
+ spool_stats.total_data_jobs, edit_uint64_with_commas(spool_stats.max_data_size, ed2));
+ bnet_fsend(bs, "Attr spooling: %d active jobs; %d total jobs %s max bytes/job.\n",
+ spool_stats.attr_jobs,
+ spool_stats.total_attr_jobs, edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
+}
+
bool begin_data_spool(JCR *jcr)
{
bool stat = true;
stat = open_data_spool_file(jcr);
if (stat) {
jcr->dcr->spooling = true;
+ Jmsg(jcr, M_INFO, 0, _("Spooling data ...\n"));
+ P(mutex);
+ spool_stats.data_jobs++;
+ V(mutex);
}
}
return stat;
}
-static int open_data_spool_file(JCR *jcr)
+static bool open_data_spool_file(JCR *jcr)
{
POOLMEM *name = get_pool_memory(PM_MESSAGE);
int spool_fd;
jcr->dcr->spool_fd = spool_fd;
jcr->spool_attributes = true;
} else {
- Jmsg(jcr, M_ERROR, 0, "open data spool file %s failed: ERR=%s\n", name, strerror(errno));
+ Jmsg(jcr, M_ERROR, 0, _("Open data spool file %s failed: ERR=%s\n"), name, strerror(errno));
free_pool_memory(name);
- return 0;
+ return false;
}
Dmsg1(100, "Created spool file: %s\n", name);
free_pool_memory(name);
- return 1;
+ return true;
}
-static int close_data_spool_file(JCR *jcr)
+static bool close_data_spool_file(JCR *jcr)
{
POOLMEM *name = get_pool_memory(PM_MESSAGE);
+ P(mutex);
+ spool_stats.data_jobs--;
+ spool_stats.total_data_jobs++;
+ spool_stats.data_size -= jcr->dcr->spool_size;
+ V(mutex);
+
make_unique_data_spool_filename(jcr, &name);
close(jcr->dcr->spool_fd);
jcr->dcr->spool_fd = -1;
unlink(name);
Dmsg1(100, "Deleted spool file: %s\n", name);
free_pool_memory(name);
- return 1;
+ return true;
}
static bool despool_data(DCR *dcr)
lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
if (ftruncate(rdcr->spool_fd, 0) != 0) {
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file error. ERR=%s\n"),
+ strerror(errno));
Dmsg1(000, "Bad return from ftruncate. ERR=%s\n", strerror(errno));
ok = false;
}
+ P(mutex);
+ spool_stats.data_size -= dcr->spool_size;
+ V(mutex);
P(dcr->dev->spool_mutex);
dcr->dev->spool_size -= dcr->spool_size;
dcr->spool_size = 0; /* zap size in input dcr */
V(dcr->dev->spool_mutex);
-
free_memory(rdev->dev_name);
free_pool_memory(rdev->errmsg);
free(rdev);
return RB_EOT;
} else if (stat != (ssize_t)rlen) {
if (stat == -1) {
- Jmsg(dcr->jcr, M_FATAL, 0, "Spool read error. ERR=%s\n", strerror(errno));
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"), strerror(errno));
} else {
Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
- Jmsg2(dcr->jcr, M_FATAL, 0, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
+ Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %u\n"), rlen, stat);
}
return RB_ERROR;
}
rlen = hdr.len;
if (rlen > block->buf_len) {
Dmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
- Jmsg2(dcr->jcr, M_FATAL, 0, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
+ Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool block too big. Max %u bytes, got %u\n"), block->buf_len, rlen);
return RB_ERROR;
}
stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
if (stat != (ssize_t)rlen) {
- Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
- Jmsg2(dcr->jcr, M_FATAL, 0, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
+ Dmsg2(000, "Spool data read error. Wanted %u bytes, got %u\n", rlen, stat);
+ Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool data read error. Wanted %u bytes, got %u\n"), rlen, stat);
return RB_ERROR;
}
/* Setup write pointers */
ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
- Dmsg0(100, "return write_block_to_dev no data to write\n");
return true;
}
despool = true;
}
V(dcr->dev->spool_mutex);
+ P(mutex);
+ spool_stats.data_size += hlen + wlen;
+ if (spool_stats.data_size > spool_stats.max_data_size) {
+ spool_stats.max_data_size = spool_stats.data_size;
+ }
+ V(mutex);
if (despool) {
char ec1[30], ec2[30], ec3[30], ec4[30];
Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
edit_uint64_with_commas(dcr->spool_size, ec2),
edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
edit_uint64_with_commas(dcr->dev->spool_size, ec4));
- despool = false;
+ Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached. Despooling ...\n"));
if (!despool_data(dcr)) {
Dmsg0(000, "Bad return from despool in write_block.\n");
return false;
}
+ /* Despooling cleard these variables so reset them */
P(dcr->dev->spool_mutex);
dcr->spool_size += hlen + wlen;
dcr->dev->spool_size += hlen + wlen;
V(dcr->dev->spool_mutex);
+ Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
}
hdr.FirstIndex = block->FirstIndex;
/* Write header */
for ( ;; ) {
stat = write(dcr->spool_fd, (char*)&hdr, (size_t)hlen);
+ if (stat == -1) {
+ Jmsg(dcr->jcr, M_INFO, 0, _("Error writing header to spool file. ERR=%s\n"), strerror(errno));
+ }
if (stat != (ssize_t)hlen) {
if (!despool_data(dcr)) {
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
return false;
}
if (retry++ > 1) {
/* Write data */
for ( ;; ) {
stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
+ if (stat == -1) {
+ Jmsg(dcr->jcr, M_INFO, 0, _("Error writing data to spool file. ERR=%s\n"), strerror(errno));
+ }
if (stat != (ssize_t)wlen) {
if (!despool_data(dcr)) {
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
return false;
}
if (retry++ > 1) {
static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
{
- Mmsg(name, "%s/%s.spool.%s.%d", working_directory, my_name,
+ Mmsg(name, "%s/%s.attr.spool.%s.%d", working_directory, my_name,
jcr->Job, fd);
}
make_unique_spool_filename(jcr, &name, bs->fd);
bs->spool_fd = fopen(mp_chr(name), "w+");
if (!bs->spool_fd) {
- Jmsg(jcr, M_ERROR, 0, "fopen spool file %s failed: ERR=%s\n", name, strerror(errno));
+ Jmsg(jcr, M_ERROR, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name, strerror(errno));
free_pool_memory(name);
return false;
}
+ P(mutex);
+ spool_stats.attr_jobs++;
+ V(mutex);
free_pool_memory(name);
return true;
}
bool close_spool_file(JCR *jcr, BSOCK *bs)
{
POOLMEM *name = get_pool_memory(PM_MESSAGE);
-
+ ssize_t size;
+
+ fseek(bs->spool_fd, 0, SEEK_END);
+ size = ftell(bs->spool_fd);
+ P(mutex);
+ if (size > 0) {
+ if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
+ spool_stats.max_attr_size = spool_stats.attr_size + size;
+ }
+ }
+ spool_stats.attr_jobs--;
+ spool_stats.total_attr_jobs++;
+ V(mutex);
make_unique_spool_filename(jcr, &name, bs->fd);
fclose(bs->spool_fd);
unlink(mp_chr(name));