]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/filed/backup.c
Add SD heartbeat
[bacula/bacula] / bacula / src / filed / backup.c
index 794e26c95357eca31bc7146d56a38ce408607e9d..61fe9c79e00b05a4e228b4542533e9ee5abdfd33 100644 (file)
 
 static int save_file(FF_PKT *ff_pkt, void *pkt);
 
+static void *heartbeat_thread(void *arg)
+{
+   int32_t n;
+   JCR *jcr = (JCR *)arg;
+   BSOCK *sd, *dir;
+
+   jcr->heartbeat_id = pthread_self();
+
+   /* Get our own local copy */
+   sd = dup_bsock(jcr->store_bsock);
+   dir = dup_bsock(jcr->dir_bsock);
+
+   jcr->duped_sd = sd;
+
+   /* Hang reading the socket to the SD, and every time we get
+    *  a heartbeat, we simply send it on to the Director to
+    *  keep him alive.
+    */
+   for ( ;; ) {
+      n = bnet_recv(sd);
+      if (is_bnet_stop(sd)) {
+        break;
+      }
+      if (n == BNET_SIGNAL && sd->msglen == BNET_HEARTBEAT) {
+        bnet_sig(dir, BNET_HEARTBEAT);
+      }
+   }
+   bnet_close(sd);
+   bnet_close(dir);
+   return NULL;
+}
+
+/* Startup the heartbeat thread -- see above */
+static void start_heartbeat_monitor(JCR *jcr)
+{
+   pthread_t hbtid;
+   jcr->duped_sd = NULL;
+   pthread_create(&hbtid, NULL, heartbeat_thread, (void *)jcr);
+}
+
+/* Terminate the heartbeat thread */
+static void stop_heartbeat_monitor(JCR *jcr) 
+{
+   pthread_t hbtid = jcr->heartbeat_id;
+
+   while (jcr->duped_sd == NULL) {
+      bmicrosleep(0, 500);           /* avoid race */
+   }
+   jcr->duped_sd->timed_out = 1;      /* set timed_out to terminate read */
+
+   pthread_kill(hbtid, TIMEOUT_SIGNAL);  /* make heartbeat thread go away */
+   pthread_join(hbtid, NULL);        /* wait for him to clean up */
+}
+
 /* 
  * Find all the requested files and send them
- * to the Storage daemon.
+ * to the Storage daemon. 
+ *
+ * Note, we normally carry on a one-way
+ * conversation from this point on with the SD, simply blasting
+ * data to him.  To properly know what is going on, we
+ * also run a "heartbeat" monitor which reads the socket and
+ * reacts accordingly (at the moment it has nothing to do
+ * except echo the heartbeat to the Director).
  * 
  */
 int blast_data_to_storage_daemon(JCR *jcr, char *addr) 
@@ -67,12 +128,16 @@ int blast_data_to_storage_daemon(JCR *jcr, char *addr)
    set_find_options((FF_PKT *)jcr->ff, jcr->incremental, jcr->mtime);
    Dmsg0(110, "start find files\n");
 
+   start_heartbeat_monitor(jcr);
+
    /* Subroutine save_file() is called for each file */
    if (!find_files(jcr, (FF_PKT *)jcr->ff, save_file, (void *)jcr)) {
       stat = 0;                      /* error */
       set_jcr_job_status(jcr, JS_ErrorTerminated);
    }
 
+   stop_heartbeat_monitor(jcr);
+
    bnet_sig(sd, BNET_EOD);           /* end data connection */
 
    if (jcr->big_buf) {
@@ -175,6 +240,8 @@ static int save_file(FF_PKT *ff_pkt, void *vjcr)
       return 1;
    }
 
+   binit(&ff_pkt->bfd, jcr->use_win_backup_api);
+
    /* Open any file with data that we intend to save */
    if (ff_pkt->type != FT_LNKSAVED && (S_ISREG(ff_pkt->statp.st_mode) && 
         ff_pkt->statp.st_size > 0) || 
@@ -187,13 +254,12 @@ static int save_file(FF_PKT *ff_pkt, void *vjcr)
       }
       if (bopen(&ff_pkt->bfd, ff_pkt->fname, O_RDONLY | O_BINARY, 0) < 0) {
         ff_pkt->ff_errno = errno;
-         Jmsg(jcr, M_NOTSAVED, -1, _("     Cannot open %s: ERR=%s.\n"), ff_pkt->fname, strerror(ff_pkt->ff_errno));
+         Jmsg(jcr, M_NOTSAVED, -1, _("     Cannot open %s: ERR=%s.\n"), ff_pkt->fname, 
+             berror(&ff_pkt->bfd));
         stop_thread_timer(tid);
         return 1;
       }
       stop_thread_timer(tid);
-   } else {
-      binit(&ff_pkt->bfd);           /* mark file not opened */
    }
 
    Dmsg1(130, "bfiled: sending %s to stored\n", ff_pkt->fname);
@@ -211,7 +277,6 @@ static int save_file(FF_PKT *ff_pkt, void *vjcr)
     * Send Attributes header to Storage daemon
     *   <file-index> <stream> <info>
     */
-/* #ifndef FD_NO_SEND_TEST */
    if (!bnet_fsend(sd, "%ld %d 0", jcr->JobFiles, stream)) {
       if (is_bopen(&ff_pkt->bfd)) {
         bclose(&ff_pkt->bfd);
@@ -256,7 +321,6 @@ static int save_file(FF_PKT *ff_pkt, void *vjcr)
       return 0;
    }
    bnet_sig(sd, BNET_EOD);           /* indicate end of attributes data */
-/* #endif  */
 
    /* 
     * If the file has data, read it and send to the Storage daemon
@@ -301,7 +365,6 @@ static int save_file(FF_PKT *ff_pkt, void *vjcr)
       }
 #endif
 
-/*    #ifndef FD_NO_SEND_TEST  */
       /*
        * Send Data header to Storage daemon
        *    <file-index> <stream> <info>
@@ -312,7 +375,6 @@ static int save_file(FF_PKT *ff_pkt, void *vjcr)
         return 0;
       }
       Dmsg1(100, ">stored: datahdr %s\n", sd->msg);
-/*    #endif */
 
       if (ff_pkt->flags & FO_MD5) {
         MD5Init(&md5c);
@@ -384,7 +446,7 @@ static int save_file(FF_PKT *ff_pkt, void *vjcr)
         }
 #endif
 
-/*      #ifndef FD_NO_SEND_TEST */
+        /*       #ifndef FD_NO_SEND_TEST */
         /* Send the buffer to the Storage daemon */
         if (!sparseBlock) {
            if (ff_pkt->flags & FO_SPARSE) {
@@ -400,50 +462,45 @@ static int save_file(FF_PKT *ff_pkt, void *vjcr)
            }
         }
          Dmsg1(130, "Send data to SD len=%d\n", sd->msglen);
-/*      #endif */
+        /*       #endif */
         jcr->JobBytes += sd->msglen;   /* count bytes saved possibly compressed */
         sd->msg = msgsave;             /* restore read buffer */
 
       } /* end while read file data */
 
+
       if (sd->msglen < 0) {
          Jmsg(jcr, M_ERROR, 0, _("Read error on file %s. ERR=%s\n"),
-           ff_pkt->fname, strerror(errno));
+           ff_pkt->fname, berror(&ff_pkt->bfd));
       }
 
       bclose(&ff_pkt->bfd);             /* close file */
-/*    #ifndef FD_NO_SEND_TEST */
       if (!bnet_sig(sd, BNET_EOD)) {    /* indicate end of file data */
         set_jcr_job_status(jcr, JS_ErrorTerminated);
         return 0;
       }
-/*    #endif  */
    }
 
 
    /* Terminate any MD5 signature and send it to Storage daemon and the Director */
    if (gotMD5 && ff_pkt->flags & FO_MD5) {
       MD5Final(signature, &md5c);
-/*    #ifndef FD_NO_SEND_TEST */
       bnet_fsend(sd, "%ld %d 0", jcr->JobFiles, STREAM_MD5_SIGNATURE);
       Dmsg1(100, "bfiled>stored:header %s\n", sd->msg);
       memcpy(sd->msg, signature, 16);
       sd->msglen = 16;
       bnet_send(sd);
       bnet_sig(sd, BNET_EOD);        /* end of MD5 */
-/*    #endif */
       gotMD5 = 0;
    } else if (gotSHA1 && ff_pkt->flags & FO_SHA1) {
    /* Terminate any SHA1 signature and send it to Storage daemon and the Director */
       SHA1Final(&sha1c, signature);
-/*    #ifndef FD_NO_SEND_TEST */
       bnet_fsend(sd, "%ld %d 0", jcr->JobFiles, STREAM_SHA1_SIGNATURE);
       Dmsg1(100, "bfiled>stored:header %s\n", sd->msg);
       memcpy(sd->msg, signature, 20);
       sd->msglen = 20;
       bnet_send(sd);
       bnet_sig(sd, BNET_EOD);        /* end of SHA1 */
-/*    #endif  */
       gotMD5 = 0;
    }
    return 1;