]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/jobq.c
Add more ingres code
[bacula/bacula] / bacula / src / dird / jobq.c
index d08d6bcde13be3c5d632917334ee18e6f424502d..aec1bd5724da97fd2a4682f623acc7cecbaf9b80 100644 (file)
@@ -1,12 +1,12 @@
 /*
    Bacula® - The Network Backup Solution
 
 /*
    Bacula® - The Network Backup Solution
 
-   Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
+   Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
 
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
 
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version two of the GNU General Public
+   modify it under the terms of version three of the GNU Affero General Public
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
@@ -15,7 +15,7 @@
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Affero General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
@@ -37,7 +37,6 @@
  *
  *  Kern Sibbald, July MMIII
  *
  *
  *  Kern Sibbald, July MMIII
  *
- *   Version $Id$
  *
  *  This code was adapted from the Bacula workq, which was
  *    adapted from "Programming with POSIX Threads", by
  *
  *  This code was adapted from the Bacula workq, which was
  *    adapted from "Programming with POSIX Threads", by
@@ -57,7 +56,6 @@ extern "C" void *sched_wait(void *arg);
 static int  start_server(jobq_t *jq);
 static bool acquire_resources(JCR *jcr);
 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
 static int  start_server(jobq_t *jq);
 static bool acquire_resources(JCR *jcr);
 static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
-static void dec_read_store(JCR *jcr);
 static void dec_write_store(JCR *jcr);
 
 /*
 static void dec_write_store(JCR *jcr);
 
 /*
@@ -172,7 +170,7 @@ void *sched_wait(void *arg)
    JCR *jcr = ((wait_pkt *)arg)->jcr;
    jobq_t *jq = ((wait_pkt *)arg)->jq;
 
    JCR *jcr = ((wait_pkt *)arg)->jcr;
    jobq_t *jq = ((wait_pkt *)arg)->jq;
 
-   set_jcr_in_tsd(jcr);
+   set_jcr_in_tsd(INVALID_JCR);
    Dmsg0(2300, "Enter sched_wait.\n");
    free(arg);
    time_t wtime = jcr->sched_time - time(NULL);
    Dmsg0(2300, "Enter sched_wait.\n");
    free(arg);
    time_t wtime = jcr->sched_time - time(NULL);
@@ -358,8 +356,10 @@ static int start_server(jobq_t *jq)
       Dmsg0(2300, "Create worker thread\n");
       /* No idle threads so create a new one */
       set_thread_concurrency(jq->max_workers + 1);
       Dmsg0(2300, "Create worker thread\n");
       /* No idle threads so create a new one */
       set_thread_concurrency(jq->max_workers + 1);
+      jq->num_workers++;
       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
          berrno be;
       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
          berrno be;
+         jq->num_workers--;
          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
          return stat;
       }
          Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
          return stat;
       }
@@ -386,7 +386,6 @@ void *jobq_server(void *arg)
    set_jcr_in_tsd(INVALID_JCR);
    Dmsg0(2300, "Start jobq_server\n");
    P(jq->mutex);
    set_jcr_in_tsd(INVALID_JCR);
    Dmsg0(2300, "Start jobq_server\n");
    P(jq->mutex);
-   jq->num_workers++;
 
    for (;;) {
       struct timeval tv;
 
    for (;;) {
       struct timeval tv;
@@ -438,6 +437,7 @@ void *jobq_server(void *arg)
          jq->running_jobs->append(je);
 
          /* Attach jcr to this thread while we run the job */
          jq->running_jobs->append(je);
 
          /* Attach jcr to this thread while we run the job */
+         jcr->set_killable(true);
          set_jcr_in_tsd(jcr);
          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
 
          set_jcr_in_tsd(jcr);
          Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
 
@@ -445,12 +445,13 @@ void *jobq_server(void *arg)
          V(jq->mutex);
 
          /* Call user's routine here */
          V(jq->mutex);
 
          /* Call user's routine here */
-         Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
-            jcr->use_count());
+         Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
+            jcr->use_count(), jcr->JobStatus);
          jq->engine(je->jcr);
 
          /* Job finished detach from thread */
          jq->engine(je->jcr);
 
          /* Job finished detach from thread */
-         set_jcr_in_tsd(INVALID_JCR);
+         remove_jcr_from_tsd(je->jcr);
+         je->jcr->set_killable(false);
 
          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
             jcr->use_count());
 
          Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
             jcr->use_count());
@@ -617,7 +618,7 @@ static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
    if (jcr->job->RescheduleOnError &&
        jcr->JobStatus != JS_Terminated &&
        jcr->JobStatus != JS_Canceled &&
    if (jcr->job->RescheduleOnError &&
        jcr->JobStatus != JS_Terminated &&
        jcr->JobStatus != JS_Canceled &&
-       jcr->get_JobType() == JT_BACKUP &&
+       jcr->getJobType() == JT_BACKUP &&
        (jcr->job->RescheduleTimes == 0 ||
         jcr->reschedule_count < jcr->job->RescheduleTimes)) {
        char dt[50], dt2[50];
        (jcr->job->RescheduleTimes == 0 ||
         jcr->reschedule_count < jcr->job->RescheduleTimes)) {
        char dt[50], dt2[50];
@@ -661,7 +662,7 @@ static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
       set_jcr_defaults(njcr, jcr->job);
       njcr->reschedule_count = jcr->reschedule_count;
       njcr->sched_time = jcr->sched_time;
       set_jcr_defaults(njcr, jcr->job);
       njcr->reschedule_count = jcr->reschedule_count;
       njcr->sched_time = jcr->sched_time;
-      njcr->set_JobLevel(jcr->get_JobLevel());
+      njcr->set_JobLevel(jcr->getJobLevel());
       njcr->pool = jcr->pool;
       njcr->run_pool_override = jcr->run_pool_override;
       njcr->full_pool = jcr->full_pool;
       njcr->pool = jcr->pool;
       njcr->run_pool_override = jcr->run_pool_override;
       njcr->full_pool = jcr->full_pool;
@@ -710,7 +711,7 @@ static bool acquire_resources(JCR *jcr)
  *   but we do not really have enough information here to
  *   know if this is really a deadlock (it may be a dual drive
  *   autochanger), and in principle, the SD reservation system
  *   but we do not really have enough information here to
  *   know if this is really a deadlock (it may be a dual drive
  *   autochanger), and in principle, the SD reservation system
- *   should detect these deadlocks, so push the work off on is.
+ *   should detect these deadlocks, so push the work off on it.
  */
 #ifdef xxx
    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
  */
 #ifdef xxx
    if (jcr->rstore && jcr->rstore == jcr->wstore) {    /* possible deadlock */
@@ -723,11 +724,7 @@ static bool acquire_resources(JCR *jcr)
 #endif
    if (jcr->rstore) {
       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
 #endif
    if (jcr->rstore) {
       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
-      if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
-         jcr->rstore->NumConcurrentReadJobs++;
-         jcr->rstore->NumConcurrentJobs++;
-         Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
-      } else {
+      if (!inc_read_store(jcr)) {
          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
          set_jcr_job_status(jcr, JS_WaitStoreRes);
          return false;
          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
          set_jcr_job_status(jcr, JS_WaitStoreRes);
          return false;
@@ -776,12 +773,34 @@ static bool acquire_resources(JCR *jcr)
    return true;
 }
 
    return true;
 }
 
-static void dec_read_store(JCR *jcr)
+static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* 
+ * Note: inc_read_store() and dec_read_store() are
+ *   called from select_rstore() in src/dird/restore.c
+ */
+bool inc_read_store(JCR *jcr)
+{
+   P(rstore_mutex);
+   if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
+      jcr->rstore->NumConcurrentReadJobs++;
+      jcr->rstore->NumConcurrentJobs++;
+      Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
+      V(rstore_mutex);
+      return true;
+   }
+   V(rstore_mutex);
+   return false;
+}
+
+void dec_read_store(JCR *jcr)
 {
    if (jcr->rstore) {
 {
    if (jcr->rstore) {
+      P(rstore_mutex);
       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
       jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
       jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
       Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
+      V(rstore_mutex);
       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
    }
       ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
       ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
    }