]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/stored/fd_cmds.c
Big backport from Enterprise
[bacula/bacula] / bacula / src / stored / fd_cmds.c
index 1b6bfe8787b583256f120ea9d7bddfac61ef95d2..c8ad493f75d07a5414d2dff3b97be3bc525cf8f8 100644 (file)
@@ -11,7 +11,7 @@
    Public License, v3.0 ("AGPLv3") and some additional permissions and
    terms pursuant to its AGPLv3 Section 7.
 
-   This notice must be preserved when any source code is 
+   This notice must be preserved when any source code is
    conveyed and/or propagated.
 
    Bacula(R) is a registered trademark of Kern Sibbald.
@@ -26,8 +26,6 @@
  *  then when the Storage daemon receives a proper connection from
  *  the File daemon, control is passed here to handle the
  *  subsequent File daemon commands.
- *
- *
  */
 
 #include "bacula.h"
@@ -41,6 +39,7 @@ extern STORES *me;
 
 /* Static variables */
 static char ferrmsg[]      = "3900 Invalid command\n";
+static char OK_data[]      = "3000 OK data\n";
 
 /* Imported functions */
 extern bool do_append_data(JCR *jcr);
@@ -55,6 +54,8 @@ static bool append_end_session(JCR *jcr);
 static bool read_open_session(JCR *jcr);
 static bool read_data_cmd(JCR *jcr);
 static bool read_close_session(JCR *jcr);
+static bool read_control_cmd(JCR *jcr);
+static bool sd_testnetwork_cmd(JCR *jcr);
 
 /* Exported function */
 bool get_bootstrap_file(JCR *jcr, BSOCK *bs);
@@ -75,6 +76,8 @@ static struct s_cmds fd_cmds[] = {
    {"read open",    read_open_session},
    {"read data",    read_data_cmd},
    {"read close",   read_close_session},
+   {"read control", read_control_cmd},
+   {"testnetwork",  sd_testnetwork_cmd},
    {NULL,           NULL}                  /* list terminator */
 };
 
@@ -93,7 +96,7 @@ static char ERROR_append[]    = "3903 Error append data: %s\n";
 /* Information sent to the Director */
 static char Job_start[] = "3010 Job %s start\n";
 char Job_end[] =
-   "3099 Job %s end JobStatus=%d JobFiles=%d JobBytes=%s JobErrors=%u\n";
+   "3099 Job %s end JobStatus=%d JobFiles=%d JobBytes=%s JobErrors=%u ErrMsg=%s\n";
 
 /*
  * Run a Client Job -- Client already authorized
@@ -116,6 +119,9 @@ void run_job(JCR *jcr)
    jcr->start_time = time(NULL);
    jcr->run_time = jcr->start_time;
    jcr->sendJobStatus(JS_Running);
+
+   /* TODO: Remove when the new match_all is well tested */
+   jcr->use_new_match_all = use_new_match_all;
    /*
     * A migrate or copy job does both a restore (read_data) and
     *   a backup (append_data).
@@ -135,7 +141,16 @@ void run_job(JCR *jcr)
       append_end_session(jcr);
    } else if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)) {
       jcr->session_opened = true;
-      read_data_cmd(jcr);
+      /* send "3000 OK data" now to avoid a dead lock, the other side is also
+       * waiting for one. The old peace of code was reading the "3000 OK" reply
+       * at the end of the backup (not really appropriate).
+       * dedup need duplex communication with the other side and need the
+       * "3000 OK" to be out of the socket, and be handle here by the right
+       * peace of code */
+      Dmsg0(215, "send OK_data\n");
+      jcr->file_bsock->fsend(OK_data);
+      jcr->is_ok_data_sent = true;
+      Dmsg1(050, "Do: read_data_cmd file_bsock=%p\n", jcr->file_bsock);
       Dmsg0(050, "Do: receive for 3000 OK data then read\n");
       if (!response(jcr, jcr->file_bsock, "3000 OK data\n", "Data received")) {
          Dmsg1(050, "Expect 3000 OK data, got: %s", jcr->file_bsock->msg);
@@ -143,6 +158,7 @@ void run_job(JCR *jcr)
          jcr->file_bsock->signal(BNET_EOD);
          goto bail_out;
       }
+      read_data_cmd(jcr);
       jcr->file_bsock->signal(BNET_EOD);
    } else {
       /* Either a Backup or Restore job */
@@ -156,9 +172,11 @@ bail_out:
    jcr->setJobStatus(JS_Terminated);
    generate_daemon_event(jcr, "JobEnd");
    generate_plugin_event(jcr, bsdEventJobEnd);
+   bash_spaces(jcr->StatusErrMsg);
    dir->fsend(Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles,
-      edit_uint64(jcr->JobBytes, ec1), jcr->JobErrors);
+              edit_uint64(jcr->JobBytes, ec1), jcr->JobErrors, jcr->StatusErrMsg);
    Dmsg1(100, "==== %s", dir->msg);
+   unbash_spaces(jcr->StatusErrMsg);
    dir->signal(BNET_EOD);             /* send EOD to Director daemon */
    free_plugins(jcr);                 /* release instantiated plugins */
    garbage_collect_memory_pool();
@@ -195,11 +213,14 @@ void do_client_commands(JCR *jcr)
             if (!fd_cmds[i].func(jcr)) {    /* do command */
                /* Note fd->msg command may be destroyed by comm activity */
                if (!job_canceled(jcr)) {
+                  strip_trailing_junk(fd->msg);
                   if (jcr->errmsg[0]) {
-                     Jmsg1(jcr, M_FATAL, 0, _("Command error with FD, hanging up. ERR=%s\n"),
-                           jcr->errmsg);
+                     strip_trailing_junk(jcr->errmsg);
+                     Jmsg2(jcr, M_FATAL, 0, _("Command error with FD msg=\"%s\", SD hanging up. ERR=%s\n"),
+                           fd->msg, jcr->errmsg);
                   } else {
-                     Jmsg0(jcr, M_FATAL, 0, _("Command error with FD, hanging up.\n"));
+                     Jmsg1(jcr, M_FATAL, 0, _("Command error with FD msg=\"%s\", SD hanging up.\n"),
+                        fd->msg);
                   }
                   jcr->setJobStatus(JS_ErrorTerminated);
                }
@@ -251,7 +272,7 @@ static bool append_end_session(JCR *jcr)
 {
    BSOCK *fd = jcr->file_bsock;
 
-   Dmsg1(120, ">filed: %s", fd->msg);
+   Dmsg1(120, "store<file: %s", fd->msg);
    if (!jcr->session_opened) {
       pm_strcpy(jcr->errmsg, _("Attempt to close non-open session.\n"));
       fd->fsend(NOT_opened);
@@ -260,6 +281,43 @@ static bool append_end_session(JCR *jcr)
    return fd->fsend(OK_end);
 }
 
+/*
+ * Test the FD/SD connectivity
+ */
+static bool sd_testnetwork_cmd(JCR *jcr)
+{
+   BSOCK *fd = jcr->file_bsock;
+   int64_t nb=0;
+   bool can_compress, ok=true;
+
+   if (sscanf(fd->msg, "testnetwork bytes=%lld", &nb) != 1) {
+      return false;
+   }
+   /* We disable the comline compression for this test */
+   can_compress = fd->can_compress();
+   fd->clear_compress();
+
+   /* First, get data from the FD */
+   while (fd->recv() > 0) { }
+
+   /* Then, send back data to the FD */
+   memset(fd->msg, 0xBB, sizeof_pool_memory(fd->msg));
+   fd->msglen = sizeof_pool_memory(fd->msg);
+
+   while(nb > 0 && ok) {
+      if (nb < fd->msglen) {
+         fd->msglen = nb;
+      }
+      ok = fd->send();
+      nb -= fd->msglen;
+   }
+   fd->signal(BNET_EOD);
+
+   if (can_compress) {
+      fd->set_compress();
+   }
+   return true;
+}
 
 /*
  * Append Open session command
@@ -305,6 +363,7 @@ static bool append_close_session(JCR *jcr)
    Dmsg1(120, ">filed: %s", fd->msg);
 
    fd->signal(BNET_EOD);              /* send EOD to File daemon */
+
    jcr->session_opened = false;
    return true;
 }
@@ -373,6 +432,19 @@ static bool read_open_session(JCR *jcr)
    return true;
 }
 
+static bool read_control_cmd(JCR *jcr)
+{
+   BSOCK *fd = jcr->file_bsock;
+
+   Dmsg1(120, "Read control: %s\n", fd->msg);
+   if (!jcr->session_opened) {
+      fd->fsend(NOT_opened);
+      return false;
+   }
+   jcr->interactive_session = true;
+   return true;
+}
+
 /*
  *   Read Close session command
  *      Close the read session