]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/stored/spool.c
Make restart2-test work
[bacula/bacula] / bacula / src / stored / spool.c
index 610c0f33bf8e1dc042f8e7d8b7110edc36453e99..898700d06333424783e631228693efe87fef7d1b 100644 (file)
@@ -1,23 +1,35 @@
+/*
+   Bacula® - The Network Backup Solution
+
+   Copyright (C) 2004-2011 Free Software Foundation Europe e.V.
+
+   The main author of Bacula is Kern Sibbald, with contributions from
+   many others, a complete list can be found in the file AUTHORS.
+   This program is Free Software; you can redistribute it and/or
+   modify it under the terms of version three of the GNU Affero General Public
+   License as published by the Free Software Foundation and included
+   in the file LICENSE.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU Affero General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA.
+
+   Bacula® is a registered trademark of Kern Sibbald.
+   The licensor of Bacula is the Free Software Foundation Europe
+   (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
+   Switzerland, email:ftf@fsfeurope.org.
+*/
 /*
  *  Spooling code
  *
  *      Kern Sibbald, March 2004
  *
- *  Version $Id$
- */
-/*
-   Copyright (C) 2004-2005 Kern Sibbald
-
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License
-   version 2 as amended with additional clauses defined in the
-   file LICENSE in the main source directory.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
-   the file LICENSE for additional details.
-
  */
 
 #include "bacula.h"
@@ -62,27 +74,36 @@ enum {
    RB_OK
 };
 
-void list_spool_stats(BSOCK *bs)
+void list_spool_stats(void sendit(const char *msg, int len, void *sarg), void *arg)
 {
    char ed1[30], ed2[30];
+   POOL_MEM msg(PM_MESSAGE);
+   int len;
+
+   len = Mmsg(msg, _("Spooling statistics:\n"));
+
    if (spool_stats.data_jobs || spool_stats.max_data_size) {
-      bnet_fsend(bs, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
+      len = Mmsg(msg, _("Data spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes/job.\n"),
          spool_stats.data_jobs, edit_uint64_with_commas(spool_stats.data_size, ed1),
          spool_stats.total_data_jobs,
          edit_uint64_with_commas(spool_stats.max_data_size, ed2));
+
+      sendit(msg.c_str(), len, arg);
    }
    if (spool_stats.attr_jobs || spool_stats.max_attr_size) {
-      bnet_fsend(bs, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
+      len = Mmsg(msg, _("Attr spooling: %u active jobs, %s bytes; %u total jobs, %s max bytes.\n"),
          spool_stats.attr_jobs, edit_uint64_with_commas(spool_stats.attr_size, ed1),
          spool_stats.total_attr_jobs,
          edit_uint64_with_commas(spool_stats.max_attr_size, ed2));
+   
+      sendit(msg.c_str(), len, arg);
    }
 }
 
 bool begin_data_spool(DCR *dcr)
 {
    bool stat = true;
-   if (dcr->jcr->spool_data) {
+   if (!dcr->dev->is_dvd() && dcr->jcr->spool_data) {
       Dmsg0(100, "Turning on data spooling\n");
       dcr->spool_data = true;
       stat = open_data_spool_file(dcr);
@@ -114,7 +135,7 @@ bool commit_data_spool(DCR *dcr)
       Dmsg0(100, "Committing spooled data\n");
       stat = despool_data(dcr, true /*commit*/);
       if (!stat) {
-         Pmsg1(000, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
+         Dmsg1(100, _("Bad return from despool WroteVol=%d\n"), dcr->WroteVol);
          close_data_spool_file(dcr);
          return false;
       }
@@ -131,8 +152,8 @@ static void make_unique_data_spool_filename(DCR *dcr, POOLMEM **name)
    } else {
       dir = working_directory;
    }
-   Mmsg(name, "%s/%s.data.%s.%s.spool", dir, my_name, dcr->jcr->Job, 
-        dcr->device->hdr.name);
+   Mmsg(name, "%s/%s.data.%u.%s.%s.spool", dir, my_name, dcr->jcr->JobId,
+        dcr->jcr->Job, dcr->device->hdr.name);
 }
 
 
@@ -148,7 +169,7 @@ static bool open_data_spool_file(DCR *dcr)
    } else {
       berrno be;
       Jmsg(dcr->jcr, M_FATAL, 0, _("Open data spool file %s failed: ERR=%s\n"), name,
-           be.strerror());
+           be.bstrerror());
       free_pool_memory(name);
       return false;
    }
@@ -184,6 +205,10 @@ static bool close_data_spool_file(DCR *dcr)
 
 static const char *spool_name = "*spool*";
 
+/*
+ * NB! This routine locks the device, but if committing will
+ *     not unlock it. If not committing, it will be unlocked.
+ */
 static bool despool_data(DCR *dcr, bool commit)
 {
    DEVICE *rdev;
@@ -193,19 +218,40 @@ static bool despool_data(DCR *dcr, bool commit)
    JCR *jcr = dcr->jcr;
    int stat;
    char ec1[50];
+   BSOCK *dir = jcr->dir_bsock;
 
    Dmsg0(100, "Despooling data\n");
+   if (jcr->dcr->job_spool_size == 0) {
+      Jmsg(jcr, M_WARNING, 0, _("Despooling zero bytes. Your disk is probably FULL!\n"));
+   }
+
+   /*
+    * Commit means that the job is done, so we commit, otherwise, we
+    *  are despooling because of user spool size max or some error  
+    *  (e.g. filesystem full).
+    */
    if (commit) {
-      Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume. Despooling %s bytes ...\n"),
+      Jmsg(jcr, M_INFO, 0, _("Committing spooled data to Volume \"%s\". Despooling %s bytes ...\n"),
+         jcr->dcr->VolumeName,
          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
-   }
-   else {
+      set_jcr_job_status(jcr, JS_DataCommitting);
+   else {
       Jmsg(jcr, M_INFO, 0, _("Writing spooled data to Volume. Despooling %s bytes ...\n"),
          edit_uint64_with_commas(jcr->dcr->job_spool_size, ec1));
+      set_jcr_job_status(jcr, JS_DataDespooling);
    }
+   set_jcr_job_status(jcr, JS_DataDespooling);
+   dir_send_job_status(jcr);
+   dcr->despool_wait = true;
    dcr->spooling = false;
-   lock_device(dcr->dev);
-   dcr->dev_locked = true;
+   /*
+    * We work with device blocked, but not locked so that
+    *  other threads -- e.g. reservations can lock the device
+    *  structure.
+    */
+   dcr->dblock(BST_DESPOOLING);
+   dcr->despool_wait = false;
+   dcr->despooling = true;
 
    /*
     * This is really quite kludgy and should be fixed some time.
@@ -221,15 +267,23 @@ static bool despool_data(DCR *dcr, bool commit)
    rdev->max_block_size = dcr->dev->max_block_size;
    rdev->min_block_size = dcr->dev->min_block_size;
    rdev->device = dcr->dev->device;
-   rdcr = new_dcr(NULL, rdev);
+   rdcr = new_dcr(jcr, NULL, rdev);
    rdcr->spool_fd = dcr->spool_fd;
-   rdcr->jcr = jcr;                   /* set a valid jcr */
    block = dcr->block;                /* save block */
    dcr->block = rdcr->block;          /* make read and write block the same */
 
    Dmsg1(800, "read/write block size = %d\n", block->buf_len);
    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
 
+#if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
+   posix_fadvise(rdcr->spool_fd, 0, 0, POSIX_FADV_WILLNEED);
+#endif
+
+   /* Add run time, to get current wait time */
+   int32_t despool_start = time(NULL) - jcr->run_time;
+
+   set_new_file_parameters(dcr);
+
    for ( ; ok; ) {
       if (job_canceled(jcr)) {
          ok = false;
@@ -245,19 +299,53 @@ static bool despool_data(DCR *dcr, bool commit)
       ok = write_block_to_device(dcr);
       if (!ok) {
          Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"),
-               dcr->dev->print_name(), strerror_dev(dcr->dev));
+               dcr->dev->print_name(), dcr->dev->bstrerror());
+         Dmsg2(000, "Fatal append error on device %s: ERR=%s\n",
+               dcr->dev->print_name(), dcr->dev->bstrerror());
       }
       Dmsg3(800, "Write block ok=%d FI=%d LI=%d\n", ok, block->FirstIndex, block->LastIndex);
    }
+
+   /*
+    * If this Job is incomplete, we need to backup the FileIndex
+    *  to the last correctly saved file so that the JobMedia
+    *  LastIndex is correct.
+    */
+   if (jcr->is_JobStatus(JS_Incomplete)) {
+      dcr->VolLastIndex = dir->get_FileIndex();
+      Dmsg1(100, "======= Set FI=%ld\n", dir->get_FileIndex());
+   }
+
+   if (!dir_create_jobmedia_record(dcr)) {
+      Jmsg2(jcr, M_FATAL, 0, _("Could not create JobMedia record for Volume=\"%s\" Job=%s\n"),
+         dcr->getVolCatName(), jcr->Job);
+   }
+   /* Set new file/block parameters for current dcr */
+   set_new_file_parameters(dcr);
+
+   /*
+    * Subtracting run_time give us elapsed time - wait_time since 
+    * we started despooling. Note, don't use time_t as it is 32 or 64
+    * bits depending on the OS and doesn't edit with %d
+    */
+   int32_t despool_elapsed = time(NULL) - despool_start - jcr->run_time;
+
+   if (despool_elapsed <= 0) {
+      despool_elapsed = 1;
+   }
+
+   Jmsg(dcr->jcr, M_INFO, 0, _("Despooling elapsed time = %02d:%02d:%02d, Transfer rate = %s Bytes/second\n"),
+         despool_elapsed / 3600, despool_elapsed % 3600 / 60, despool_elapsed % 60,
+         edit_uint64_with_suffix(jcr->dcr->job_spool_size / despool_elapsed, ec1));
+
    dcr->block = block;                /* reset block */
 
    lseek(rdcr->spool_fd, 0, SEEK_SET); /* rewind */
    if (ftruncate(rdcr->spool_fd, 0) != 0) {
       berrno be;
       Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
-         be.strerror());
-      Pmsg1(000, _("Bad return from ftruncate. ERR=%s\n"), be.strerror());
-      ok = false;
+         be.bstrerror());
+      /* Note, try continuing despite ftruncate problem */
    }
 
    P(mutex);
@@ -275,11 +363,20 @@ static bool despool_data(DCR *dcr, bool commit)
    free_pool_memory(rdev->errmsg);
    /* Be careful to NULL the jcr and free rdev after free_dcr() */
    rdcr->jcr = NULL;
+   rdcr->dev = NULL;
    free_dcr(rdcr);
    free(rdev);
-   unlock_device(dcr->dev);
-   dcr->dev_locked = false;
    dcr->spooling = true;           /* turn on spooling again */
+   dcr->despooling = false;
+   /*
+    * Note, if committing we leave the device blocked. It will be removed in
+    *  release_device();
+    */
+   if (!commit) {
+      dcr->dev->dunblock();
+   }
+   set_jcr_job_status(jcr, JS_Running);
+   dir_send_job_status(jcr);
    return ok;
 }
 
@@ -306,7 +403,7 @@ static int read_block_from_spool_file(DCR *dcr)
       if (stat == -1) {
          berrno be;
          Jmsg(dcr->jcr, M_FATAL, 0, _("Spool header read error. ERR=%s\n"),
-              be.strerror());
+              be.bstrerror());
       } else {
          Pmsg2(000, _("Spool read error. Wanted %u bytes, got %d\n"), rlen, stat);
          Jmsg2(dcr->jcr, M_FATAL, 0, _("Spool header read error. Wanted %u bytes, got %d\n"), rlen, stat);
@@ -348,6 +445,9 @@ bool write_block_to_spool_file(DCR *dcr)
    bool despool = false;
    DEV_BLOCK *block = dcr->block;
 
+   if (job_canceled(dcr->jcr)) {
+      return false;
+   }
    ASSERT(block->binbuf == ((uint32_t) (block->bufp - block->buf)));
    if (block->binbuf <= WRITE_BLKHDR_LENGTH) {  /* Does block have data in it? */
       return true;
@@ -421,16 +521,24 @@ static bool write_spool_header(DCR *dcr)
       if (stat == -1) {
          berrno be;
          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing header to spool file. ERR=%s\n"),
-              be.strerror());
+              be.bstrerror());
       }
       if (stat != (ssize_t)sizeof(hdr)) {
+         Jmsg(dcr->jcr, M_ERROR, 0, _("Error writing header to spool file."
+              " Disk probably full. Attempting recovery. Wanted to write=%d got=%d\n"),
+              (int)stat, (int)sizeof(hdr));
          /* If we wrote something, truncate it, then despool */
          if (stat != -1) {
-            if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR) - stat) != 0) {
+#if defined(HAVE_WIN32)
+            boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
+#else
+            boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
+#endif
+            if (ftruncate(dcr->spool_fd, pos - stat) != 0) {
                berrno be;
-               Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
-                  be.strerror());
-               return false;
+               Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
+                  be.bstrerror());
+              /* Note, try continuing despite ftruncate problem */
             }
          }
          if (!despool_data(dcr, false)) {
@@ -456,19 +564,23 @@ static bool write_spool_data(DCR *dcr)
       if (stat == -1) {
          berrno be;
          Jmsg(dcr->jcr, M_FATAL, 0, _("Error writing data to spool file. ERR=%s\n"),
-              be.strerror());
+              be.bstrerror());
       }
       if (stat != (ssize_t)block->binbuf) {
          /*
           * If we wrote something, truncate it and the header, then despool
           */
          if (stat != -1) {
-            if (ftruncate(dcr->spool_fd, lseek(dcr->spool_fd, (off_t)0, SEEK_CUR)
-                      - stat - sizeof(spool_hdr)) != 0) {
+#if defined(HAVE_WIN32)
+            boffset_t   pos = _lseeki64(dcr->spool_fd, (__int64)0, SEEK_CUR);
+#else
+            boffset_t   pos = lseek(dcr->spool_fd, 0, SEEK_CUR);
+#endif
+            if (ftruncate(dcr->spool_fd, pos - stat - sizeof(spool_hdr)) != 0) {
                berrno be;
-               Jmsg(dcr->jcr, M_FATAL, 0, _("Ftruncate spool file failed: ERR=%s\n"),
-                  be.strerror());
-               return false;
+               Jmsg(dcr->jcr, M_ERROR, 0, _("Ftruncate spool file failed: ERR=%s\n"),
+                  be.bstrerror());
+               /* Note, try continuing despite ftruncate problem */
             }
          }
          if (!despool_data(dcr, false)) {
@@ -490,7 +602,7 @@ static bool write_spool_data(DCR *dcr)
 
 bool are_attributes_spooled(JCR *jcr)
 {
-   return jcr->spool_attributes && jcr->dir_bsock->spool_fd;
+   return jcr->spool_attributes && jcr->dir_bsock->m_spool_fd;
 }
 
 /*
@@ -529,23 +641,75 @@ static void update_attr_spool_size(ssize_t size)
    V(mutex);
 }
 
+static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
+{
+   Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
+      jcr->Job, fd);
+}
+
+/*
+ * Tell Director where to find the attributes spool file 
+ *  Note, if we are not on the same machine, the Director will
+ *  return an error, and the higher level routine will transmit
+ *  the data record by record -- using bsock->despool().
+ */
+static bool blast_attr_spool_file(JCR *jcr, boffset_t size)
+{
+   /* send full spool file name */
+   POOLMEM *name  = get_pool_memory(PM_MESSAGE);
+   make_unique_spool_filename(jcr, &name, jcr->dir_bsock->m_fd);
+   bash_spaces(name);
+   jcr->dir_bsock->fsend("BlastAttr Job=%s File=%s\n", jcr->Job, name);
+   free_pool_memory(name);
+   
+   if (jcr->dir_bsock->recv() <= 0) {
+      Jmsg(jcr, M_FATAL, 0, _("Network error on BlastAttributes.\n"));
+      return false;
+   }
+   
+   if (!bstrcmp(jcr->dir_bsock->msg, "1000 OK BlastAttr\n")) {
+      return false;
+   }
+   return true;
+}
+
 bool commit_attribute_spool(JCR *jcr)
 {
-   off_t size;
+   boffset_t size, data_end;
    char ec1[30];
+   char tbuf[100];
+   BSOCK *dir;
 
+   Dmsg1(100, "Commit attributes at %s\n", bstrftimes(tbuf, sizeof(tbuf),
+         (utime_t)time(NULL)));
    if (are_attributes_spooled(jcr)) {
-      if (fseeko(jcr->dir_bsock->spool_fd, 0, SEEK_END) != 0) {
+      dir = jcr->dir_bsock;
+      if (fseeko(dir->m_spool_fd, 0, SEEK_END) != 0) {
          berrno be;
          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
-              be.strerror());
+              be.bstrerror());
          goto bail_out;
       }
-      size = ftello(jcr->dir_bsock->spool_fd);
+      size = ftello(dir->m_spool_fd);
+      if (jcr->is_JobStatus(JS_Incomplete)) {
+         data_end = dir->get_data_end();
+         /* Check and truncate to last valid data_end if necssary */
+         if (size > data_end) {
+            if (ftruncate(fileno(dir->m_spool_fd), data_end) != 0) {
+               berrno be;
+               Jmsg(jcr, M_FATAL, 0, _("Truncate on attributes file failed: ERR=%s\n"),
+                    be.bstrerror());
+               goto bail_out;
+            }
+            Dmsg2(100, "=== Attrib spool truncated from %lld to %lld\n", 
+                  size, data_end);
+            size = data_end;
+         }
+      }
       if (size < 0) {
          berrno be;
          Jmsg(jcr, M_FATAL, 0, _("Fseek on attributes file failed: ERR=%s\n"),
-              be.strerror());
+              be.bstrerror());
          goto bail_out;
       }
       P(mutex);
@@ -554,35 +718,36 @@ bool commit_attribute_spool(JCR *jcr)
       }
       spool_stats.attr_size += size;
       V(mutex);
+      set_jcr_job_status(jcr, JS_AttrDespooling);
+      dir_send_job_status(jcr);
       Jmsg(jcr, M_INFO, 0, _("Sending spooled attrs to the Director. Despooling %s bytes ...\n"),
             edit_uint64_with_commas(size, ec1));
-      bnet_despool_to_bsock(jcr->dir_bsock, update_attr_spool_size, size);
-      return close_attr_spool_file(jcr, jcr->dir_bsock);
+
+      if (!blast_attr_spool_file(jcr, size)) {
+         /* Can't read spool file from director side,
+          * send content over network.
+          */
+         dir->despool(update_attr_spool_size, size);
+      }
+      return close_attr_spool_file(jcr, dir);
    }
    return true;
 
 bail_out:
-   close_attr_spool_file(jcr, jcr->dir_bsock);
+   close_attr_spool_file(jcr, dir);
    return false;
 }
 
-static void make_unique_spool_filename(JCR *jcr, POOLMEM **name, int fd)
-{
-   Mmsg(name, "%s/%s.attr.%s.%d.spool", working_directory, my_name,
-      jcr->Job, fd);
-}
-
-
-bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
+static bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
 {
    POOLMEM *name  = get_pool_memory(PM_MESSAGE);
 
-   make_unique_spool_filename(jcr, &name, bs->fd);
-   bs->spool_fd = fopen(name, "w+");
-   if (!bs->spool_fd) {
+   make_unique_spool_filename(jcr, &name, bs->m_fd);
+   bs->m_spool_fd = fopen(name, "w+b");
+   if (!bs->m_spool_fd) {
       berrno be;
       Jmsg(jcr, M_FATAL, 0, _("fopen attr spool file %s failed: ERR=%s\n"), name,
-           be.strerror());
+           be.bstrerror());
       free_pool_memory(name);
       return false;
    }
@@ -593,11 +758,15 @@ bool open_attr_spool_file(JCR *jcr, BSOCK *bs)
    return true;
 }
 
-bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
+static bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
 {
    POOLMEM *name;
 
-   if (!bs->spool_fd) {
+   char tbuf[100];
+
+   Dmsg1(100, "Close attr spool file at %s\n", bstrftimes(tbuf, sizeof(tbuf),
+         (utime_t)time(NULL)));
+   if (!bs->m_spool_fd) {
       return true;
    }
    name = get_pool_memory(PM_MESSAGE);
@@ -605,11 +774,11 @@ bool close_attr_spool_file(JCR *jcr, BSOCK *bs)
    spool_stats.attr_jobs--;
    spool_stats.total_attr_jobs++;
    V(mutex);
-   make_unique_spool_filename(jcr, &name, bs->fd);
-   fclose(bs->spool_fd);
+   make_unique_spool_filename(jcr, &name, bs->m_fd);
+   fclose(bs->m_spool_fd);
    unlink(name);
    free_pool_memory(name);
-   bs->spool_fd = NULL;
-   bs->spool = false;
+   bs->m_spool_fd = NULL;
+   bs->clear_spooling();
    return true;
 }