static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
static bool open_data_spool_file(JCR *jcr);
static bool close_data_spool_file(JCR *jcr);
-static bool despool_data(DCR *dcr);
+static bool despool_data(DCR *dcr, bool commit);
static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
+static bool open_attr_spool_file(JCR *jcr, BSOCK *bs);
+static bool close_attr_spool_file(JCR *jcr, BSOCK *bs);
+static bool write_spool_header(DCR *dcr, DEV_BLOCK *block);
+static bool write_spool_data(DCR *dcr, DEV_BLOCK *block);
struct spool_stats_t {
uint32_t data_jobs; /* current jobs spooling data */
uint32_t attr_jobs;
uint32_t total_data_jobs; /* total jobs to have spooled data */
uint32_t total_attr_jobs;
- uint64_t max_data_size; /* max data size */
- uint64_t max_attr_size;
- uint64_t data_size; /* current data size (all jobs running) */
- uint64_t attr_size;
+ int64_t max_data_size; /* max data size */
+ int64_t max_attr_size;
+ int64_t data_size; /* current data size (all jobs running) */
+ int64_t attr_size;
};
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
{
char ed1[30], ed2[30];
if (spool_stats.data_jobs || spool_stats.max_data_size) {
- bnet_fsend(bs, "Data spooling: %d active jobs, %s bytes; %d total jobs, %s max bytes/job.\n",
+ bnet_fsend(bs, "Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n",
spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
spool_stats.total_data_jobs,
edit_uint64_with_commas(spool_stats.max_data_size, ed2));
}
if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
- bnet_fsend(bs, "Attr spooling: %d active jobs; %d total jobs, %s max bytes/job.\n",
- spool_stats.attr_jobs, spool_stats.total_attr_jobs,
- edit_uint64_with_commas(spool_stats.max_attr_size, ed1));
+ bnet_fsend(bs, "Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n",
+ spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
+ spool_stats.total_attr_jobs,
+ edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
}
}
bool commit_data_spool(JCR *jcr)
{
bool stat;
- char ec1[40];
if (jcr->dcr->spooling) {
Dmsg0(100, "Committing spooled data\n");
- Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
- edit_uint64_with_commas(jcr->dcr->dev->spool_size, ec1));
- stat = despool_data(jcr->dcr);
+ stat = despool_data(jcr->dcr, true /*commit*/);
if (!stat) {
Dmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
close_data_spool_file(jcr);
static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name)
{
- char *dir;
+ const char *dir;
if (jcr->dcr->dev->device->spool_directory) {
dir = jcr->dcr->dev->device->spool_directory;
} else {
return true;
}
-static bool despool_data(DCR *dcr)
+static const char *spool_name = "*spool*";
+
+static bool despool_data(DCR *dcr, bool commit)
{
DEVICE *rdev;
DCR *rdcr;
DEV_BLOCK *block;
JCR *jcr = dcr->jcr;
int stat;
+ char ec1[50];
Dmsg0(100, "Despooling data\n");
+ Jmsg(jcr, M_INFO, 0, _("%s spooled data to Volume. Despooling %s bytes ...\n"),
+ commit?"Committing":"Writting",
+ edit_uint64_with_commas(jcr->dcr->dev->spool_size, ec1));
dcr->spooling = false;
lock_device(dcr->dev);
dcr->dev_locked = true;
- /* Setup a dev structure to read */
+ /*
+ * This is really quite kludgy and should be fixed some time.
+ * We create a dev structure to read from the spool file
+ * in rdev and rdcr.
+ */
rdev = (DEVICE *)malloc(sizeof(DEVICE));
memset(rdev, 0, sizeof(DEVICE));
- rdev->dev_name = get_memory(strlen("spool")+1);
- strcpy(rdev->dev_name, "spool");
+ rdev->dev_name = get_memory(strlen(spool_name)+1);
+ strcpy(rdev->dev_name, spool_name);
rdev->errmsg = get_pool_memory(PM_EMSG);
*rdev->errmsg = 0;
rdev->max_block_size = dcr->dev->max_block_size;
rdcr->spool_fd = dcr->spool_fd;
rdcr->jcr = jcr; /* set a valid jcr */
block = rdcr->block;
+
Dmsg1(800, "read/write block size = %d\n", block->buf_len);
lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
V(dcr->dev->spool_mutex);
free_memory(rdev->dev_name);
free_pool_memory(rdev->errmsg);
- free(rdev);
+ /* Be careful to NULL the jcr and free rdev after free_dcr() */
rdcr->jcr = NULL;
free_dcr(rdcr);
+ free(rdev);
unlock_device(dcr->dev);
dcr->dev_locked = false;
dcr->spooling = true; /* turn on spooling again */
*/
bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
{
- ssize_t stat = 0;
uint32_t wlen, hlen; /* length to write */
- int retry = 0;
- spool_hdr hdr;
bool despool = false;
ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
return true;
}
- hlen = sizeof(hdr);
+ hlen = sizeof(spool_hdr);
wlen = block->binbuf;
P(dcr->dev->spool_mutex);
dcr->spool_size += hlen + wlen;
}
V(mutex);
if (despool) {
- char ec1[30];
#ifdef xDEBUG
- char ec2[30], ec3[30], ec4[30];
+ char ec1[30], ec2[30], ec3[30], ec4[30];
Dmsg4(100, "Despool in write_block_to_spool_file max_size=%s size=%s "
"max_job_size=%s job_size=%s\n",
edit_uint64_with_commas(dcr->max_spool_size, ec1),
edit_uint64_with_commas(dcr->dev->max_spool_size, ec3),
edit_uint64_with_commas(dcr->dev->spool_size, ec4));
#endif
- Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached. Despooling %s bytes ...\n"),
- edit_uint64_with_commas(dcr->dev->spool_size, ec1));
- if (!despool_data(dcr)) {
+ Jmsg(dcr->jcr, M_INFO, 0, _("User specified spool size reached.\n"));
+ if (!despool_data(dcr, false)) {
Dmsg0(000, "Bad return from despool in write_block.\n");
return false;
}
Jmsg(dcr->jcr, M_INFO, 0, _("Spooling data again ...\n"));
}
+
+ if (!write_spool_header(dcr, block)) {
+ return false;
+ }
+ if (!write_spool_data(dcr, block)) {
+ return false;
+ }
+
+ Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
+ empty_block(block);
+ return true;
+}
+
+static bool write_spool_header(DCR *dcr, DEV_BLOCK *block)
+{
+ spool_hdr hdr;
+ ssize_t stat;
+
hdr.FirstIndex = block->FirstIndex;
hdr.LastIndex = block->LastIndex;
hdr.len = block->binbuf;
/* Write header */
- for ( ;; ) {
- stat = write(dcr->spool_fd, (char*)&hdr, (size_t)hlen);
+ for (int retry=0; retry<=1; retry++) {
+ stat = write(dcr->spool_fd, (char*)&hdr, sizeof(hdr));
if (stat == -1) {
Jmsg(dcr->jcr, M_INFO, 0, _("Error writing header to spool file. ERR=%s\n"), strerror(errno));
}
- if (stat != (ssize_t)hlen) {
- if (!despool_data(dcr)) {
- Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
- return false;
+ if (stat != (ssize_t)sizeof(hdr)) {
+ /* If we wrote something, truncate it, then despool */
+ if (stat != -1) {
+ ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat);
}
- if (retry++ > 1) {
+ if (!despool_data(dcr, false)) {
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
return false;
}
- continue;
+ continue; /* try again */
}
- break;
+ return true;
}
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after header spooling error failed.\n"));
+ return false;
+}
+
+static bool write_spool_data(DCR *dcr, DEV_BLOCK *block)
+{
+ ssize_t stat;
/* Write data */
- for ( ;; ) {
- stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
+ for (int retry=0; retry<=1; retry++) {
+ stat = write(dcr->spool_fd, block->buf, (size_t)block->binbuf);
if (stat == -1) {
Jmsg(dcr->jcr, M_INFO, 0, _("Error writing data to spool file. ERR=%s\n"), strerror(errno));
}
- if (stat != (ssize_t)wlen) {
- if (!despool_data(dcr)) {
+ if (stat != (ssize_t)block->binbuf) {
+ /*
+ * If we wrote something, truncate it and the header, then despool
+ */
+ if (stat != -1) {
+ ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
+ - stat - sizeof(spool_hdr));
+ }
+ if (!despool_data(dcr, false)) {
Jmsg(dcr->jcr, M_FATAL, 0, _("Fatal despooling error."));
return false;
}
- if (retry++ > 1) {
+ if (!write_spool_header(dcr, block)) {
return false;
}
- continue;
+ continue; /* try again */
}
- break;
+ return true;
}
- Dmsg2(100, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
-
- empty_block(block);
- return true;
+ Jmsg(dcr->jcr, M_FATAL, 0, _("Retrying after data spooling error failed.\n"));
+ return false;
}
+
bool are_attributes_spooled(JCR *jcr)
{
return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
bool begin_attribute_spool(JCR *jcr)
{
if (!jcr->no_attributes && jcr->spool_attributes) {
- return open_spool_file(jcr, jcr->dir_bsock);
+ return open_attr_spool_file(jcr, jcr->dir_bsock);
}
return true;
}
bool discard_attribute_spool(JCR *jcr)
{
if (are_attributes_spooled(jcr)) {
- return close_spool_file(jcr, jcr->dir_bsock);
+ return close_attr_spool_file(jcr, jcr->dir_bsock);
}
return true;
}
+static void update_attr_spool_size(ssize_t size)
+{
+ P(mutex);
+ if (size > 0) {
+ if ((spool_stats.attr_size - size) > 0) {
+ spool_stats.attr_size -= size;
+ } else {
+ spool_stats.attr_size = 0;
+ }
+ }
+ V(mutex);
+}
+
bool commit_attribute_spool(JCR *jcr)
{
+ ssize_t size;
+ char ec1[30];
+
if (are_attributes_spooled(jcr)) {
- bnet_despool_to_bsock(jcr->dir_bsock);
- return close_spool_file(jcr, jcr->dir_bsock);
+ fseek(jcr->dir_bsock->spool_fd, 0, SEEK_END);
+ size = ftell(jcr->dir_bsock->spool_fd);
+ P(mutex);
+ if (size > 0) {
+ if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
+ spool_stats.max_attr_size = spool_stats.attr_size + size;
+ }
+ }
+ spool_stats.attr_size += size;
+ V(mutex);
+ Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to DIR. Despooling %s bytes ...\n"),
+ edit_uint64_with_commas(size, ec1));
+ bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
+ return close_attr_spool_file(jcr, jcr->dir_bsock);
}
return true;
}
jcr->Job, fd);
}
-bool open_spool_file(JCR *jcr, BSOCK *bs)
+
+bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
{
- POOLMEM *name = get_pool_memory(PM_MESSAGE);
-
- make_unique_spool_filename(jcr, &name, bs->fd);
- bs->spool_fd = fopen(mp_chr(name), "w+");
- if (!bs->spool_fd) {
- Jmsg(jcr, M_ERROR, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name, strerror(errno));
- free_pool_memory(name);
- return false;
- }
- P(mutex);
- spool_stats.attr_jobs++;
- V(mutex);
- free_pool_memory(name);
- return true;
+ POOLMEM *name = get_pool_memory(PM_MESSAGE);
+
+ make_unique_spool_filename(jcr, &name, bs->fd);
+ bs->spool_fd = fopen(mp_chr(name), "w+");
+ if (!bs->spool_fd) {
+ Jmsg(jcr, M_ERROR, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name, strerror(errno));
+ free_pool_memory(name);
+ return false;
+ }
+ P(mutex);
+ spool_stats.attr_jobs++;
+ V(mutex);
+ free_pool_memory(name);
+ return true;
}
-bool close_spool_file(JCR *jcr, BSOCK *bs)
+bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
{
- POOLMEM *name = get_pool_memory(PM_MESSAGE);
- ssize_t size;
-
- fseek(bs->spool_fd, 0, SEEK_END);
- size = ftell(bs->spool_fd);
- P(mutex);
- if (size > 0) {
- if (spool_stats.attr_size + size > spool_stats.max_attr_size) {
- spool_stats.max_attr_size = spool_stats.attr_size + size;
- }
- }
- spool_stats.attr_jobs--;
- spool_stats.total_attr_jobs++;
- V(mutex);
- make_unique_spool_filename(jcr, &name, bs->fd);
- fclose(bs->spool_fd);
- unlink(mp_chr(name));
- free_pool_memory(name);
- bs->spool_fd = NULL;
- bs->spool = false;
- return true;
+ POOLMEM *name;
+
+ if (!bs->spool_fd) {
+ return true;
+ }
+ name = get_pool_memory(PM_MESSAGE);
+ P(mutex);
+ spool_stats.attr_jobs--;
+ spool_stats.total_attr_jobs++;
+ V(mutex);
+ make_unique_spool_filename(jcr, &name, bs->fd);
+ fclose(bs->spool_fd);
+ unlink(mp_chr(name));
+ free_pool_memory(name);
+ bs->spool_fd = NULL;
+ bs->spool = false;
+ return true;
}