]> git.sur5r.net Git - bacula/bacula/commitdiff
Add SD heartbeat
authorKern Sibbald <kern@sibbald.com>
Fri, 2 May 2003 19:33:24 +0000 (19:33 +0000)
committerKern Sibbald <kern@sibbald.com>
Fri, 2 May 2003 19:33:24 +0000 (19:33 +0000)
git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@479 91ce42f0-d328-0410-95d8-f526ca767f89

bacula/kernstodo
bacula/src/dird/getmsg.c
bacula/src/filed/backup.c
bacula/src/jcr.h
bacula/src/lib/bnet.c
bacula/src/stored/askdir.c

index 0168026ba1513aeae862a2c139a5e1dbed7e3575..40ec0450ace08831225b48a1d8336f0db5882414 100644 (file)
@@ -15,7 +15,7 @@ Testing to do: (painful)
 - blocksize recognition code.
 - multiple simultaneous Volumes
 
-For 1.30 release:
+For 1.30a release:
 - Examine Bare Metal restore problem.
 - Test multiple simultaneous Volumes
 - Document FInclude ...
@@ -23,6 +23,10 @@ For 1.30 release:
 - Figure out how to use ssh or stunnel to protect Bacula communications.
 
 After 1.30:
+- Fix command prompt in gnome-console by checking on Ready.
+- Implement HEART_BEAT while SD waiting for tapes.
+- Include RunBeforeJob and RunAfterJob output in the message
+  stream.
 - Check if Job/File retentions apply to multivolume jobs.
 - Change M_INFO to M_RESTORED for all restored files.
 - Remove subsysdir from conf files (used only in autostart scripts).
index 0676e1d092380a1d6243acfdf6773f8090fc7b27..8383a7ab5b7e139731db5c908383a07b1ae3c7af 100644 (file)
@@ -83,28 +83,28 @@ int32_t bget_msg(BSOCK *bs, int rtn)
       if (n == BNET_SIGNAL) {         /* handle signal */
         /* BNET_SIGNAL (-1) return from bnet_recv() => network signal */
         switch (bs->msglen) {
-           case BNET_EOD:            /* end of data */
-              return n;
-           case BNET_EOD_POLL:
-              bnet_fsend(bs, OK_msg);/* send response */
-              return n;              /* end of data */
-           case BNET_TERMINATE:
-              bs->terminated = 1;
-              return n;
-           case BNET_POLL:
-              bnet_fsend(bs, OK_msg); /* send response */
-              break;
-           case BNET_HEARTBEAT:
-              bnet_sig(bs, BNET_HB_RESPONSE);
-              break;
-           case BNET_STATUS:
-              /* *****FIXME***** Implement */
-               bnet_fsend(bs, "Status OK\n");
-              bnet_sig(bs, BNET_EOD);
-              break;
-           default:
-               Emsg1(M_WARNING, 0, _("bget_msg: unknown signal %d\n"), bs->msglen);
-              return n;
+        case BNET_EOD:            /* end of data */
+           return n;
+        case BNET_EOD_POLL:
+           bnet_fsend(bs, OK_msg);/* send response */
+           return n;              /* end of data */
+        case BNET_TERMINATE:
+           bs->terminated = 1;
+           return n;
+        case BNET_POLL:
+           bnet_fsend(bs, OK_msg); /* send response */
+           break;
+        case BNET_HEARTBEAT:
+        case BNET_HB_RESPONSE:
+           break;
+        case BNET_STATUS:
+           /* *****FIXME***** Implement */
+            bnet_fsend(bs, "Status OK\n");
+           bnet_sig(bs, BNET_EOD);
+           break;
+        default:
+            Emsg1(M_WARNING, 0, _("bget_msg: unknown signal %d\n"), bs->msglen);
+           return n;
         }
         continue;
       }
index 98b0988006002f8ca2b76b76f6187d3715f75bc3..61fe9c79e00b05a4e228b4542533e9ee5abdfd33 100644 (file)
 
 static int save_file(FF_PKT *ff_pkt, void *pkt);
 
+static void *heartbeat_thread(void *arg)
+{
+   int32_t n;
+   JCR *jcr = (JCR *)arg;
+   BSOCK *sd, *dir;
+
+   jcr->heartbeat_id = pthread_self();
+
+   /* Get our own local copy */
+   sd = dup_bsock(jcr->store_bsock);
+   dir = dup_bsock(jcr->dir_bsock);
+
+   jcr->duped_sd = sd;
+
+   /* Hang reading the socket to the SD, and every time we get
+    *  a heartbeat, we simply send it on to the Director to
+    *  keep him alive.
+    */
+   for ( ;; ) {
+      n = bnet_recv(sd);
+      if (is_bnet_stop(sd)) {
+        break;
+      }
+      if (n == BNET_SIGNAL && sd->msglen == BNET_HEARTBEAT) {
+        bnet_sig(dir, BNET_HEARTBEAT);
+      }
+   }
+   bnet_close(sd);
+   bnet_close(dir);
+   return NULL;
+}
+
+/* Startup the heartbeat thread -- see above */
+static void start_heartbeat_monitor(JCR *jcr)
+{
+   pthread_t hbtid;
+   jcr->duped_sd = NULL;
+   pthread_create(&hbtid, NULL, heartbeat_thread, (void *)jcr);
+}
+
+/* Terminate the heartbeat thread */
+static void stop_heartbeat_monitor(JCR *jcr) 
+{
+   pthread_t hbtid = jcr->heartbeat_id;
+
+   while (jcr->duped_sd == NULL) {
+      bmicrosleep(0, 500);           /* avoid race */
+   }
+   jcr->duped_sd->timed_out = 1;      /* set timed_out to terminate read */
+
+   pthread_kill(hbtid, TIMEOUT_SIGNAL);  /* make heartbeat thread go away */
+   pthread_join(hbtid, NULL);        /* wait for him to clean up */
+}
+
 /* 
  * Find all the requested files and send them
- * to the Storage daemon.
+ * to the Storage daemon. 
+ *
+ * Note, we normally carry on a one-way
+ * conversation from this point on with the SD, simply blasting
+ * data to him.  To properly know what is going on, we
+ * also run a "heartbeat" monitor which reads the socket and
+ * reacts accordingly (at the moment it has nothing to do
+ * except echo the heartbeat to the Director).
  * 
  */
 int blast_data_to_storage_daemon(JCR *jcr, char *addr) 
@@ -67,12 +128,16 @@ int blast_data_to_storage_daemon(JCR *jcr, char *addr)
    set_find_options((FF_PKT *)jcr->ff, jcr->incremental, jcr->mtime);
    Dmsg0(110, "start find files\n");
 
+   start_heartbeat_monitor(jcr);
+
    /* Subroutine save_file() is called for each file */
    if (!find_files(jcr, (FF_PKT *)jcr->ff, save_file, (void *)jcr)) {
       stat = 0;                      /* error */
       set_jcr_job_status(jcr, JS_ErrorTerminated);
    }
 
+   stop_heartbeat_monitor(jcr);
+
    bnet_sig(sd, BNET_EOD);           /* end data connection */
 
    if (jcr->big_buf) {
index 6092d10381203dffa928a3a77b775d69b026d961..ed4d390a5a08da3d5b773bdef107060718603539 100644 (file)
@@ -151,6 +151,7 @@ struct s_jcr {
    int replace;                       /* Replace option */
 #endif /* DIRECTOR_DAEMON */
 
+
 #ifdef FILE_DAEMON
    /* File Daemon specific part of JCR */
    uint32_t num_files_examined;       /* files examined this job */
@@ -176,8 +177,11 @@ struct s_jcr {
    uint32_t StartBlock;
    uint32_t EndBlock;
    int use_win_backup_api;            /* set to use native Win API */
+   pthread_t heartbeat_id;            /* id of heartbeat thread */
+   BSOCK *duped_sd;                   /* duped SD socket */
 #endif /* FILE_DAEMON */
 
+
 #ifdef STORAGE_DAEMON
    /* Storage Daemon specific part of JCR */
    struct s_jcr *next_dev;            /* next JCR attached to device */
index ca6bc75dd1b48aecfa21f661e9eb4f41a74b62f9..66aa09add0dab38e1b6b257fba05c00b95fcc514 100644 (file)
@@ -341,7 +341,7 @@ bnet_send(BSOCK *bsock)
         }
       } else {
          Jmsg5(bsock->jcr, M_ERROR, 0, _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"), 
-              bsock->who, bsock->host, bsock->port, bsock->msglen, rc);
+              bsock->msglen, bsock->who, bsock->host, bsock->port, rc);
       }
       return 0;
    }
@@ -722,6 +722,12 @@ dup_bsock(BSOCK *osock)
    memcpy(bsock, osock, sizeof(BSOCK));
    bsock->msg = get_pool_memory(PM_MESSAGE);
    bsock->errmsg = get_pool_memory(PM_MESSAGE);
+   if (osock->who) {
+      bsock->who = bstrdup(osock->who);
+   }
+   if (osock->host) {
+      bsock->host = bstrdup(osock->host);
+   }
    bsock->duped = TRUE;
    return bsock;
 }
@@ -735,12 +741,9 @@ bnet_close(BSOCK *bsock)
    for ( ; bsock != NULL; bsock = next) {
       next = bsock->next;
       if (!bsock->duped) {
-//      shutdown(bsock->fd, SHUT_RDWR);
         close(bsock->fd);
-        term_bsock(bsock);
-      } else {
-        free(bsock);
       }
+      term_bsock(bsock);
    }
    return;
 }
index c0e3470a422f6b14291e25216cbc4bff83e6888d..2283834f929479eb821645dc6a63f708e0a841da 100644 (file)
@@ -30,9 +30,8 @@
 #include "stored.h"                   /* pull in Storage Deamon headers */
 
 /* Requests sent to the Director */
-static char Find_media[]    = "CatReq Job=%s FindMedia=%d\n";
+static char Find_media[]   = "CatReq Job=%s FindMedia=%d\n";
 static char Get_Vol_Info[] = "CatReq Job=%s GetVolInfo VolName=%s write=%d\n";
-
 static char Update_media[] = "CatReq Job=%s UpdateMedia VolName=%s\
  VolJobs=%u VolFiles=%u VolBlocks=%u VolBytes=%s VolMounts=%u\
  VolErrors=%u VolWrites=%u MaxVolBytes=%s EndTime=%d VolStatus=%s\
@@ -41,11 +40,8 @@ static char Update_media[] = "CatReq Job=%s UpdateMedia VolName=%s\
 static char Create_job_media[] = "CatReq Job=%s CreateJobMedia \
  FirstIndex=%u LastIndex=%u StartFile=%u EndFile=%u \
  StartBlock=%u EndBlock=%u\n";
-
-
 static char FileAttributes[] = "UpdCat Job=%s FileAttributes ";
-
-static char Job_status[]   = "3012 Job %s jobstatus %d\n";
+static char Job_status[]     = "3012 Job %s jobstatus %d\n";
 
 
 /* Responses received from the Director */
@@ -53,9 +49,10 @@ static char OK_media[] = "1000 OK VolName=%127s VolJobs=%u VolFiles=%u\
  VolBlocks=%u VolBytes=%" lld " VolMounts=%u VolErrors=%u VolWrites=%u\
  MaxVolBytes=%" lld " VolCapacityBytes=%" lld " VolStatus=%20s\
  Slot=%d MaxVolJobs=%u MaxVolFiles=%u\n";
-
 static char OK_update[] = "1000 OK UpdateMedia\n";
 
+/* Forward referenced functions */
+static int device_wait(JCR *jcr, DEVICE *dev, int wait_sec);
 
 /*
  * Send current JobStatus to Director
@@ -252,9 +249,6 @@ int dir_update_file_attributes(JCR *jcr, DEV_RECORD *rec)
  */
 int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev)
 {
-   struct timeval tv;
-   struct timezone tz;
-   struct timespec timeout;
    int stat = 0, jstat;
    /* ******FIXME******* put these on config variable */
    int min_wait = 60 * 60;
@@ -263,7 +257,6 @@ int dir_ask_sysop_to_mount_next_volume(JCR *jcr, DEVICE *dev)
 
    int wait_sec;
    int num_wait = 0;
-   int dev_blocked;
 
    Dmsg0(130, "enter dir_ask_sysop_to_mount_next_volume\n");
    ASSERT(dev->dev_blocked);
@@ -305,36 +298,11 @@ Please use the \"label\"  command to create a new Volume for:\n\
              jcr->media_type,
              jcr->pool_name);
       }
-      /*
-       * Wait then send message again
-       */
-      gettimeofday(&tv, &tz);
-      timeout.tv_nsec = tv.tv_usec * 1000;
-      timeout.tv_sec = tv.tv_sec + wait_sec;
 
-      P(dev->mutex);
-      dev_blocked = dev->dev_blocked;
-      dev->dev_blocked = BST_WAITING_FOR_SYSOP; /* indicate waiting for mount */
       jcr->JobStatus = jstat;
       dir_send_job_status(jcr);
 
-      for ( ;!job_canceled(jcr); ) {
-         Dmsg1(190, "I'm going to sleep on device %s\n", dev->dev_name);
-        stat = pthread_cond_timedwait(&dev->wait_next_vol, &dev->mutex, &timeout);
-        if (dev->dev_blocked == BST_WAITING_FOR_SYSOP) {
-           break;
-        }
-        /*         
-         * Someone other than us blocked the device (probably the
-         *  user via the Console program.   
-         * So, we continue waiting.
-         */
-        gettimeofday(&tv, &tz);
-        timeout.tv_nsec = 0;
-        timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
-      }
-      dev->dev_blocked = dev_blocked;
-      V(dev->mutex);
+      stat = device_wait(jcr, dev, wait_sec);
 
       if (stat == ETIMEDOUT) {
         wait_sec *= 2;               /* double wait time */
@@ -402,11 +370,7 @@ int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev)
    int max_num_wait = 9;             /* 5 waits =~ 1 day, then 1 day at a time */
    int wait_sec;
    int num_wait = 0;
-   int dev_blocked;
    char *msg;
-   struct timeval tv;
-   struct timezone tz;
-   struct timespec timeout;
 
    Dmsg0(130, "enter dir_ask_sysop_to_mount_next_volume\n");
    if (!jcr->VolumeName[0]) {
@@ -427,36 +391,10 @@ int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev)
       Dmsg3(190, "Mount %s on %s for Job %s\n",
            jcr->VolumeName, jcr->dev_name, jcr->Job);
 
-      /*
-       * Wait then send message again
-       */
-      gettimeofday(&tv, &tz);
-      timeout.tv_nsec = tv.tv_usec * 1000;
-      timeout.tv_sec = tv.tv_sec + wait_sec;
-
-      P(dev->mutex);
-      dev_blocked = dev->dev_blocked;
-      dev->dev_blocked = BST_WAITING_FOR_SYSOP; /* indicate waiting for mount */
       jcr->JobStatus = JS_WaitMount;
       dir_send_job_status(jcr);
 
-      for ( ;!job_canceled(jcr); ) {
-         Dmsg1(190, "I'm going to sleep on device %s\n", dev->dev_name);
-        stat = pthread_cond_timedwait(&dev->wait_next_vol, &dev->mutex, &timeout);
-        if (dev->dev_blocked == BST_WAITING_FOR_SYSOP) {
-           break;
-        }
-        /*         
-         * Someone other than us blocked the device (probably the
-         *  user via the Console program.   
-         * So, we continue waiting.
-         */
-        gettimeofday(&tv, &tz);
-        timeout.tv_nsec = 0;
-        timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
-      }
-      dev->dev_blocked = dev_blocked;
-      V(dev->mutex);
+      stat = device_wait(jcr, dev, wait_sec); /* wait on device */
 
       if (stat == ETIMEDOUT) {
         wait_sec *= 2;               /* double wait time */
@@ -495,3 +433,79 @@ int dir_ask_sysop_to_mount_volume(JCR *jcr, DEVICE *dev)
    Dmsg0(130, "leave dir_ask_sysop_to_mount_next_volume\n");
    return 1;
 }
+
+#define HB_TIME 20*60  /* send a heatbeat once every 20 minutes while waiting */
+
+static int device_wait(JCR *jcr, DEVICE *dev, int wait_sec)
+{
+   struct timeval tv;
+   struct timezone tz;
+   struct timespec timeout;
+   int dev_blocked;
+   time_t start = time(NULL);
+   time_t last_heartbeat = 0;
+   int stat = 0;
+   
+   /*
+    * Wait requested time (wait_sec).  However, we also wake up every
+    *   HB_TIME seconds and send a heartbeat to the FD and the Director
+    *   to keep stateful firewalls from closing them down while waiting
+    *   for the operator.
+    */
+   gettimeofday(&tv, &tz);
+   timeout.tv_nsec = tv.tv_usec * 1000;
+   timeout.tv_sec = tv.tv_sec + (wait_sec > HB_TIME ? HB_TIME: wait_sec);
+
+   P(dev->mutex);
+   dev_blocked = dev->dev_blocked;
+   dev->dev_blocked = BST_WAITING_FOR_SYSOP; /* indicate waiting for mount */
+
+   for ( ; !job_canceled(jcr); ) {
+      int add_wait;
+
+      Dmsg1(190, "I'm going to sleep on device %s\n", dev->dev_name);
+      stat = pthread_cond_timedwait(&dev->wait_next_vol, &dev->mutex, &timeout);
+
+      /* Note, this always triggers the first time. We want that. */
+      time_t now = time(NULL);
+      if (now - last_heartbeat >= HB_TIME) {
+        /* send heartbeats */
+        if (jcr->file_bsock) {
+           bnet_sig(jcr->file_bsock, BNET_HEARTBEAT);
+        }
+        if (jcr->dir_bsock) {
+           bnet_sig(jcr->dir_bsock, BNET_HEARTBEAT);
+        }
+        last_heartbeat = now;
+      }
+
+      /* Check if we blocked the device */
+      if (dev->dev_blocked == BST_WAITING_FOR_SYSOP) {
+        if (stat != ETIMEDOUT) {     /* we blocked the device */
+           break;                    /* on error return */
+        }
+        if (now - start >= wait_sec) {  /* on exceeding wait time return */
+           break;
+        }
+        add_wait = wait_sec - (now - start);
+        if (add_wait > HB_TIME) {
+           add_wait = HB_TIME;
+        }
+      } else {                       /* Oops someone else has it blocked now */
+        add_wait = 10;               /* hang around until he releases it */
+      }
+      /*        
+       * Note, if dev_blocked is not BST_WAITING FOR_SYSOP,
+       *  someone other than us has blocked the device (probably the
+       *  user via the Console program), so we continue waiting
+       *  until he releases the device back to us.
+       */
+      gettimeofday(&tv, &tz);
+      timeout.tv_nsec = tv.tv_usec * 1000;
+      timeout.tv_sec = tv.tv_sec + add_wait; /* additional wait */
+   }
+
+   dev->dev_blocked = dev_blocked;
+   V(dev->mutex);
+   return stat;
+}