]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/stored/spool.c
More data spooling updates
[bacula/bacula] / bacula / src / stored / spool.c
index d825963e1cf5e01faff04638c23fb91496ad3939..3f1aefe67fd599cd460bc0b92762b49ea961a0fc 100644 (file)
@@ -33,19 +33,38 @@ static void make_unique_data_spool_filename(JCR *jcr, POOLMEM **name);
 static int open_data_spool_file(JCR *jcr);
 static int close_data_spool_file(JCR *jcr);
 static bool despool_data(DCR *dcr);
+static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block);
 
+struct spool_hdr {
+   int32_t  FirstIndex;
+   int32_t  LastIndex;
+   uint32_t len;
+};
+
+enum {
+   RB_EOT = 1,
+   RB_ERROR,
+   RB_OK
+};
 
 int begin_data_spool(JCR *jcr)
 {
-   if (jcr->dcr->spool_data) {
-      return open_data_spool_file(jcr);
+   int stat = 1;
+   if (jcr->spool_data) {
+      Dmsg0(100, "Turning on data spooling\n");
+      jcr->dcr->spool_data = true;
+      stat = open_data_spool_file(jcr);
+      if (stat) {
+        jcr->dcr->spooling = true;
+      }
    }
-   return 1;
+   return stat;
 }
 
 int discard_data_spool(JCR *jcr)
 {
-   if (jcr->dcr->spool_data && jcr->dcr->spool_fd >= 0) {
+   if (jcr->dcr->spooling) {
+      Dmsg0(100, "Data spooling discarded\n");
       return close_data_spool_file(jcr);
    }
    return 1;
@@ -54,11 +73,11 @@ int discard_data_spool(JCR *jcr)
 int commit_data_spool(JCR *jcr)
 {
    bool stat;
-   if (jcr->dcr->spool_data && jcr->dcr->spool_fd >= 0) {
-      lock_device(jcr->dcr->dev);
+   if (jcr->dcr->spooling) {
+      Dmsg0(100, "Committing spooled data\n");
       stat = despool_data(jcr->dcr);
-      unlock_device(jcr->dcr->dev);
       if (!stat) {
+         Dmsg1(000, "Bad return from despool WroteVol=%d\n", jcr->dcr->WroteVol);
         close_data_spool_file(jcr);
         return 0;
       }
@@ -88,6 +107,7 @@ static int open_data_spool_file(JCR *jcr)
       free_pool_memory(name);
       return 0;
     }
+    Dmsg1(100, "Created spool file: %s\n", name);
     free_pool_memory(name);
     return 1;
 }
@@ -99,50 +119,116 @@ static int close_data_spool_file(JCR *jcr)
     make_unique_data_spool_filename(jcr, &name);
     close(jcr->dcr->spool_fd);
     jcr->dcr->spool_fd = -1;
+    jcr->dcr->spooling = false;
     unlink(name);
+    Dmsg1(100, "Deleted spool file: %s\n", name);
     free_pool_memory(name);
     return 1;
 }
 
 static bool despool_data(DCR *dcr)
 {
-   DEVICE *sdev;
-   DCR *sdcr;
+   DEVICE *rdev;
+   DCR *rdcr;
    dcr->spooling = false;
    bool ok = true;
-   DEV_BLOCK *block = dcr->block;
+   DEV_BLOCK *block;
    JCR *jcr = dcr->jcr;
+   int stat;
 
+// lock_device(dcr->dev);
+   Dmsg0(100, "Despooling data\n");
    /* Set up a dev structure to read */
-   sdev = (DEVICE *)malloc(sizeof(DEVICE));
-   memset(sdev, 0, sizeof(DEVICE));
-   sdev->fd = dcr->spool_fd;
-   lseek(sdev->fd, 0, SEEK_SET); /* rewind */
-   sdcr = new_dcr(jcr, sdev);
+   rdev = (DEVICE *)malloc(sizeof(DEVICE));
+   memset(rdev, 0, sizeof(DEVICE));
+   rdev->dev_name = get_memory(strlen("spool")+1);
+   strcpy(rdev->dev_name, "spool");
+   rdev->errmsg = get_pool_memory(PM_EMSG);
+   *rdev->errmsg = 0;
+   rdcr = new_dcr(NULL, rdev);
+   rdcr->spool_fd = dcr->spool_fd; 
+   rdcr->jcr = jcr;                  /* set a valid jcr */
+   block = rdcr->block;
+   lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
    for ( ; ok; ) {
       if (job_canceled(jcr)) {
         ok = false;
         break;
       }
-      if (!read_block_from_dev(jcr, sdev, block, CHECK_BLOCK_NUMBERS)) {
-        if (dev_state(sdev, ST_EOT)) {
-           break;
-        }
-        ok = false;
+      stat = read_block_from_spool_file(rdcr, block);
+      if (stat == RB_EOT) {
         break;
-      }
-      if (!write_block_to_dev(dcr, block)) {
+      } else if (stat == RB_ERROR) {
         ok = false;
         break;
       }
+      ok = write_block_to_device(dcr, block);
+      Dmsg3(100, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
    }
-   lseek(sdev->fd, 0, SEEK_SET); /* rewind */
-   if (ftruncate(sdev->fd, 0) != 0) {
+   lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
+   if (ftruncate(rdcr->spool_fd, 0) != 0) {
+      Dmsg1(000, "Bad return from ftruncate. ERR=%s\n", strerror(errno));
       ok = false;
    }
+   free_memory(rdev->dev_name);
+   free_pool_memory(rdev->errmsg);
+   free(rdev);
+   rdcr->jcr = NULL;
+   free_dcr(rdcr);
+// unlock_device(dcr->dev);
    return ok;
 }
 
+/*
+ * Read a block from the spool file
+ * 
+ *  Returns RB_OK on success
+ *         RB_EOT when file done
+ *         RB_ERROR on error
+ */
+static int read_block_from_spool_file(DCR *dcr, DEV_BLOCK *block)
+{
+   uint32_t rlen;
+   ssize_t stat;
+   spool_hdr hdr;
+
+   rlen = sizeof(hdr);
+   stat = read(dcr->spool_fd, (char *)&hdr, (size_t)rlen);
+   if (stat == 0) {
+      Dmsg0(100, "EOT on spool read.\n");
+      return RB_EOT;
+   } else if (stat != (ssize_t)rlen) {
+      if (stat == -1) {
+         Jmsg(dcr->jcr, M_FATAL, 0, "Spool read error. ERR=%s\n", strerror(errno));
+      } else {
+         Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
+         Jmsg2(dcr->jcr, M_FATAL, 0, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
+      }
+      return RB_ERROR;
+   }
+   rlen = hdr.len;
+   if (rlen > block->buf_len) {
+      Dmsg2(000, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
+      Jmsg2(dcr->jcr, M_FATAL, 0, "Spool block too big. Max %u bytes, got %u\n", block->buf_len, rlen);
+      return RB_ERROR;
+   }
+   stat = read(dcr->spool_fd, (char *)block->buf, (size_t)rlen);
+   if (stat != (ssize_t)rlen) {
+      Dmsg2(000, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
+      Jmsg2(dcr->jcr, M_FATAL, 0, "Spool read error. Wanted %u bytes, got %u\n", rlen, stat);
+      return RB_ERROR;
+   }
+   /* Setup write pointers */
+   block->binbuf = rlen;
+   block->bufp = block->buf + block->binbuf;
+   block->FirstIndex = hdr.FirstIndex;
+   block->LastIndex = hdr.LastIndex;
+   block->VolSessionId = dcr->jcr->VolSessionId;
+   block->VolSessionTime = dcr->jcr->VolSessionTime;
+   Dmsg2(400, "Read block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
+   return RB_OK;
+}
+
 /*
  * Write a block to the spool file
  *
@@ -153,44 +239,35 @@ bool write_block_to_spool_file(DCR *dcr, DEV_BLOCK *block)
 {
    ssize_t stat = 0;
    uint32_t wlen;                    /* length to write */
-   DEVICE *dev = dcr->dev;
    int retry = 0;
+   spool_hdr hdr;   
 
    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
 
-   wlen = block->binbuf;
-   if (wlen <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
+   if (block->binbuf <= WRITE_BLKHDR_LENGTH) { /* Does block have data in it? */
       Dmsg0(100, "return write_block_to_dev no data to write\n");
       return true;
    }
-   /* 
-    * Clear to the end of the buffer if it is not full,
-    *  and on tape devices, apply min and fixed blocking.
-    */
-   if (wlen != block->buf_len) {
-      uint32_t blen;                 /* current buffer length */
-
-      Dmsg2(200, "binbuf=%d buf_len=%d\n", block->binbuf, block->buf_len);
-      blen = wlen;
-
-      /* Adjust write size to min/max for tapes only */
-      if (dev->state & ST_TAPE) {
-        if (wlen < dev->min_block_size) {
-           wlen =  ((dev->min_block_size + TAPE_BSIZE - 1) / TAPE_BSIZE) * TAPE_BSIZE;
-        }
-        /* check for fixed block size */
-        if (dev->min_block_size == dev->max_block_size) {
-           wlen = block->buf_len;    /* fixed block size already rounded */
-        }
+
+   hdr.FirstIndex = block->FirstIndex;
+   hdr.LastIndex = block->LastIndex;
+   hdr.len = block->binbuf;
+   wlen = sizeof(hdr);
+write_hdr_again:
+   stat = write(dcr->spool_fd, (char*)&hdr, (size_t)wlen);
+   if (stat != (ssize_t)wlen) {
+      if (!despool_data(dcr)) {
+        return false;
       }
-      if (wlen-blen > 0) {
-        memset(block->bufp, 0, wlen-blen); /* clear garbage */
+      if (retry++ > 1) {
+        return false;
       }
-   }  
+      goto write_hdr_again;
+   }
 
-   ser_block_header(block);
 
-   Dmsg1(300, "Write block of %u bytes\n", wlen);      
+   wlen = block->binbuf;
+   Dmsg2(300, "Wrote block FI=%d LI=%d\n", block->FirstIndex, block->LastIndex);
 write_again:
    stat = write(dcr->spool_fd, block->buf, (size_t)wlen);
    if (stat != (ssize_t)wlen) {