]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/msgchan.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / dird / msgchan.c
index faaaceb3e7284f9d39f5051ff6f73e7cd21fbcf4..3ed70c677795889f8d79efc6d2c76897d46672fb 100644 (file)
 #include "bacula.h"
 #include "dird.h"
 
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+
 /* Commands sent to Storage daemon */
-static char jobcmd[]     = "JobId=%d job=%s job_name=%s client_name=%s "
+static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
@@ -49,7 +51,7 @@ static char OK_device[]  = "3000 OK use device device=%s\n";
 /* Storage Daemon requests */
 static char Job_start[]  = "3010 Job %127s start\n";
 static char Job_end[]    =
-   "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld\n";
+   "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
 
 /* Forward referenced functions */
 extern "C" void *msg_thread(void *arg);
@@ -126,28 +128,41 @@ bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
    BSOCK *sd;
    char auth_key[100];
    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
+   POOL_MEM job_name, client_name, fileset_name;
    int copy = 0;
    int stripe = 0;
+   char ed1[30];
 
    sd = jcr->store_bsock;
    /*
     * Now send JobId and permissions, and get back the authorization key.
     */
-   bash_spaces(jcr->job->hdr.name);
-   bash_spaces(jcr->client->hdr.name);
-   bash_spaces(jcr->fileset->hdr.name);
+   pm_strcpy(job_name, jcr->job->hdr.name);
+   bash_spaces(job_name);
+   pm_strcpy(client_name, jcr->client->hdr.name);
+   bash_spaces(client_name);
+   pm_strcpy(fileset_name, jcr->fileset->hdr.name);
+   bash_spaces(fileset_name);
    if (jcr->fileset->MD5[0] == 0) {
       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
    }
-   bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
-              jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
-              jcr->fileset->hdr.name, !jcr->pool->catalog_files,
+   /* If rescheduling, cancel the previous incarnation of this job
+    *  with the SD, which might be waiting on the FD connection.
+    *  If we do not cancel it the SD will not accept a new connection
+    *  for the same jobid.
+    */
+   if (jcr->reschedule_count) {
+      bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
+      while (bnet_recv(sd) >= 0)
+         { }
+   } 
+   bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
+              job_name.c_str(), client_name.c_str(), 
+              jcr->JobType, jcr->JobLevel,
+              fileset_name.c_str(), !jcr->pool->catalog_files,
               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
    Dmsg1(100, ">stored: %s\n", sd->msg);
-   unbash_spaces(jcr->job->hdr.name);
-   unbash_spaces(jcr->client->hdr.name);
-   unbash_spaces(jcr->fileset->hdr.name);
    if (bget_dirmsg(sd) > 0) {
        Dmsg1(100, "<stored: %s", sd->msg);
        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
@@ -198,19 +213,15 @@ bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
             Dmsg1(100, ">stored: %s", sd->msg);
          }
          bnet_sig(sd, BNET_EOD);            /* end of Devices */
-      }
-      bnet_sig(sd, BNET_EOD);            /* end of Storages */
-      if (bget_dirmsg(sd) > 0) {
-         Dmsg1(100, "<stored: %s", sd->msg);
-         /* ****FIXME**** save actual device name */
-         ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
-      } else {
-         POOL_MEM err_msg;
-         pm_strcpy(err_msg, sd->msg); /* save message */
-         Jmsg(jcr, M_FATAL, 0, _("\n"
-            "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
-            device_name.c_str(), err_msg.c_str()/* sd->msg */);
-         ok = false;
+         bnet_sig(sd, BNET_EOD);            /* end of Storages */
+         if (bget_dirmsg(sd) > 0) {
+            Dmsg1(100, "<stored: %s", sd->msg);
+            /* ****FIXME**** save actual device name */
+            ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
+         } else {
+            ok = false;
+         }
+         break;
       }
    }
 
@@ -233,19 +244,28 @@ bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
             Dmsg1(100, ">stored: %s", sd->msg);
          }
          bnet_sig(sd, BNET_EOD);            /* end of Devices */
+         bnet_sig(sd, BNET_EOD);            /* end of Storages */
+         if (bget_dirmsg(sd) > 0) {
+            Dmsg1(100, "<stored: %s", sd->msg);
+            /* ****FIXME**** save actual device name */
+            ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
+         } else {
+            ok = false;
+         }
+         break;
       }
-      bnet_sig(sd, BNET_EOD);            /* end of Storages */
-      if (bget_dirmsg(sd) > 0) {
-         Dmsg1(100, "<stored: %s", sd->msg);
-         /* ****FIXME**** save actual device name */
-         ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
-      } else {
-         POOL_MEM err_msg;
+   }
+   if (!ok) {
+      POOL_MEM err_msg;
+      if (sd->msg[0]) {
          pm_strcpy(err_msg, sd->msg); /* save message */
          Jmsg(jcr, M_FATAL, 0, _("\n"
-            "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
-            device_name.c_str(), err_msg.c_str()/* sd->msg */);
-         ok = false;
+              "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
+              device_name.c_str(), err_msg.c_str()/* sd->msg */);
+      } else { 
+         Jmsg(jcr, M_FATAL, 0, _("\n"
+              "     Storage daemon didn't accept Device \"%s\" command.\n"), 
+              device_name.c_str());
       }
    }
    return ok;
@@ -260,34 +280,30 @@ int start_storage_daemon_message_thread(JCR *jcr)
    int status;
    pthread_t thid;
 
-   P(jcr->mutex);
-   jcr->use_count++;                  /* mark in use by msg thread */
+   jcr->inc_use_count();              /* mark in use by msg thread */
    jcr->sd_msg_thread_done = false;
    jcr->SD_msg_chan = 0;
-   V(jcr->mutex);
    Dmsg0(100, "Start SD msg_thread.\n");
    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
       berrno be;
       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
    }
-   Dmsg0(100, "SD msg_thread started.\n");
    /* Wait for thread to start */
    while (jcr->SD_msg_chan == 0) {
       bmicrosleep(0, 50);
    }
+   Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
    return 1;
 }
 
 extern "C" void msg_thread_cleanup(void *arg)
 {
    JCR *jcr = (JCR *)arg;
-   Dmsg0(200, "End msg_thread\n");
    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
-   P(jcr->mutex);
    jcr->sd_msg_thread_done = true;
-   pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
    jcr->SD_msg_chan = 0;
-   V(jcr->mutex);
+   pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
+   Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
    free_jcr(jcr);                     /* release jcr */
 }
 
@@ -314,9 +330,8 @@ extern "C" void *msg_thread(void *arg)
    /* Read the Storage daemon's output.
     */
    Dmsg0(100, "Start msg_thread loop\n");
-   while ((stat=bget_dirmsg(sd)) >= 0) {
-      int stat;
-      Dmsg1(3400, "<stored: %s", sd->msg);
+   while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
+      Dmsg1(400, "<stored: %s", sd->msg);
       if (sscanf(sd->msg, Job_start, Job) == 1) {
          continue;
       }
@@ -327,6 +342,7 @@ extern "C" void *msg_thread(void *arg)
          jcr->SDJobBytes = JobBytes;
          break;
       }
+      Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
    }
    if (is_bnet_error(sd)) {
       jcr->SDJobStatus = JS_ErrorTerminated;
@@ -339,8 +355,6 @@ void wait_for_storage_daemon_termination(JCR *jcr)
 {
    int cancel_count = 0;
    /* Now wait for Storage daemon to terminate our message thread */
-   set_jcr_job_status(jcr, JS_WaitSD);
-   P(jcr->mutex);
    while (!jcr->sd_msg_thread_done) {
       struct timeval tv;
       struct timezone tz;
@@ -348,18 +362,25 @@ void wait_for_storage_daemon_termination(JCR *jcr)
 
       gettimeofday(&tv, &tz);
       timeout.tv_nsec = 0;
-      timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
-      Dmsg0(300, "I'm waiting for message thread termination.\n");
-      pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout);
+      timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
+      Dmsg0(400, "I'm waiting for message thread termination.\n");
+      P(mutex);
+      pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
+      V(mutex);
       if (job_canceled(jcr)) {
+         if (jcr->SD_msg_chan) {
+            jcr->store_bsock->timed_out = 1;
+            jcr->store_bsock->terminated = 1;
+            Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
+            pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
+         }
          cancel_count++;
       }
       /* Give SD 30 seconds to clean up after cancel */
-      if (cancel_count == 3) {
+      if (cancel_count == 6) {
          break;
       }
    }
-   V(jcr->mutex);
    set_jcr_job_status(jcr, JS_Terminated);
 }
 
@@ -377,7 +398,7 @@ extern "C" void *device_thread(void *arg)
    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
    for (i=0; i < MAX_TRIES; i++) {
       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
-         Dmsg0(000, "Failed connecting to SD.\n");
+         Dmsg0(900, "Failed connecting to SD.\n");
          continue;
       }
       LockRes();