]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/jobq.c
kes Apply Eric's patch to ensure that autoprune does not return
[bacula/bacula] / bacula / src / dird / jobq.c
index 308acd77307b378da3f8f44bd505cebba34b39ae..af8605268d300db1aa10e27c3d8ae6d333921def 100644 (file)
@@ -56,8 +56,9 @@ extern "C" void *sched_wait(void *arg);
 
 static int  start_server(jobq_t *jq);
 static bool acquire_resources(JCR *jcr);
-
-
+static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
+static void dec_read_store(JCR *jcr);
+static void dec_write_store(JCR *jcr);
 
 /*
  * Initialize a job queue
@@ -477,89 +478,17 @@ void *jobq_server(void *arg)
           *  put into the ready queue.
           */
          if (jcr->acquired_resource_locks) {
-            if (jcr->rstore) {
-               jcr->rstore->NumConcurrentJobs--;
-               Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
-               ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
-            }
-            if (jcr->wstore) {
-               jcr->wstore->NumConcurrentJobs--;
-               Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
-               ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
-            }
+            dec_read_store(jcr);
+            dec_write_store(jcr);
             jcr->client->NumConcurrentJobs--;
             jcr->job->NumConcurrentJobs--;
             jcr->acquired_resource_locks = false;
          }
 
-         /*
-          * Reschedule the job if necessary and requested
-          */
-         if (jcr->job->RescheduleOnError &&
-             jcr->JobStatus != JS_Terminated &&
-             jcr->JobStatus != JS_Canceled &&
-             jcr->JobType == JT_BACKUP &&
-             (jcr->job->RescheduleTimes == 0 ||
-              jcr->reschedule_count < jcr->job->RescheduleTimes)) {
-             char dt[50], dt2[50];
-
-             /*
-              * Reschedule this job by cleaning it up, but
-              *  reuse the same JobId if possible.
-              */
-            time_t now = time(NULL);
-            jcr->reschedule_count++;
-            jcr->sched_time = now + jcr->job->RescheduleInterval;
-            bstrftime(dt, sizeof(dt), now);
-            bstrftime(dt2, sizeof(dt2), jcr->sched_time);
-            Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
-                  (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
-            Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
-                 jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
-            dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
-            jcr->JobStatus = -1;
-            set_jcr_job_status(jcr, JS_WaitStartTime);
-            jcr->SDJobStatus = 0;
-            if (jcr->JobBytes == 0) {
-               Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
-               V(jq->mutex);
-               jobq_add(jq, jcr);     /* queue the job to run again */
-               P(jq->mutex);
-               free_jcr(jcr);         /* release jcr */
-               free(je);              /* free the job entry */
-               continue;              /* look for another job to run */
-            }
-            /*
-             * Something was actually backed up, so we cannot reuse
-             *   the old JobId or there will be database record
-             *   conflicts.  We now create a new job, copying the
-             *   appropriate fields.
-             */           
-            JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
-            set_jcr_defaults(njcr, jcr->job);
-            njcr->reschedule_count = jcr->reschedule_count;
-            njcr->sched_time = jcr->sched_time;
-            njcr->JobLevel = jcr->JobLevel;
-            njcr->JobStatus = -1;
-            set_jcr_job_status(njcr, jcr->JobStatus);
-            if (jcr->rstore) {
-               copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
-            } else {
-               free_rstorage(njcr);
-            }
-            if (jcr->wstore) {
-               copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
-            } else {
-               free_wstorage(njcr);
-            }
-            njcr->messages = jcr->messages;
-            Dmsg0(2300, "Call to run new job\n");
-            V(jq->mutex);
-            run_job(njcr);            /* This creates a "new" job */
-            free_jcr(njcr);           /* release "new" jcr */
-            P(jq->mutex);
-            Dmsg0(2300, "Back from running new job.\n");
+         if (reschedule_job(jcr, jq, je)) {
+            continue;              /* go look for more work */
          }
+
          /* Clean up and release old jcr */
          Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
          jcr->SDJobStatus = 0;
@@ -672,6 +601,94 @@ void *jobq_server(void *arg)
    return NULL;
 }
 
+/*
+ * Returns true if cleanup done and we should look for more work
+ */
+static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
+{
+   /*
+    * Reschedule the job if necessary and requested
+    */
+   if (jcr->job->RescheduleOnError &&
+       jcr->JobStatus != JS_Terminated &&
+       jcr->JobStatus != JS_Canceled &&
+       jcr->JobType == JT_BACKUP &&
+       (jcr->job->RescheduleTimes == 0 ||
+        jcr->reschedule_count < jcr->job->RescheduleTimes)) {
+       char dt[50], dt2[50];
+
+       /*
+        * Reschedule this job by cleaning it up, but
+        *  reuse the same JobId if possible.
+        */
+      time_t now = time(NULL);
+      jcr->reschedule_count++;
+      jcr->sched_time = now + jcr->job->RescheduleInterval;
+      bstrftime(dt, sizeof(dt), now);
+      bstrftime(dt2, sizeof(dt2), jcr->sched_time);
+      Dmsg4(2300, "Rescheduled Job %s to re-run in %d seconds.(now=%u,then=%u)\n", jcr->Job,
+            (int)jcr->job->RescheduleInterval, now, jcr->sched_time);
+      Jmsg(jcr, M_INFO, 0, _("Rescheduled Job %s at %s to re-run in %d seconds (%s).\n"),
+           jcr->Job, dt, (int)jcr->job->RescheduleInterval, dt2);
+      dird_free_jcr_pointers(jcr);     /* partial cleanup old stuff */
+      jcr->JobStatus = -1;
+      set_jcr_job_status(jcr, JS_WaitStartTime);
+      jcr->SDJobStatus = 0;
+      if (!allow_duplicate_job(jcr)) {
+         return false;
+      }
+      if (jcr->JobBytes == 0) {
+         Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
+         V(jq->mutex);
+         jobq_add(jq, jcr);     /* queue the job to run again */
+         P(jq->mutex);
+         free_jcr(jcr);         /* release jcr */
+         free(je);              /* free the job entry */
+         return true;           /* we already cleaned up */
+      }
+      /*
+       * Something was actually backed up, so we cannot reuse
+       *   the old JobId or there will be database record
+       *   conflicts.  We now create a new job, copying the
+       *   appropriate fields.
+       */           
+      JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
+      set_jcr_defaults(njcr, jcr->job);
+      njcr->reschedule_count = jcr->reschedule_count;
+      njcr->sched_time = jcr->sched_time;
+      njcr->JobLevel = jcr->JobLevel;
+      njcr->pool = jcr->pool;
+      njcr->run_pool_override = jcr->run_pool_override;
+      njcr->full_pool = jcr->full_pool;
+      njcr->run_full_pool_override = jcr->run_full_pool_override;
+      njcr->inc_pool = jcr->inc_pool;
+      njcr->run_inc_pool_override = jcr->run_inc_pool_override;
+      njcr->diff_pool = jcr->diff_pool;
+      njcr->JobStatus = -1;
+      set_jcr_job_status(njcr, jcr->JobStatus);
+      if (jcr->rstore) {
+         copy_rstorage(njcr, jcr->rstorage, _("previous Job"));
+      } else {
+         free_rstorage(njcr);
+      }
+      if (jcr->wstore) {
+         copy_wstorage(njcr, jcr->wstorage, _("previous Job"));
+      } else {
+         free_wstorage(njcr);
+      }
+      njcr->messages = jcr->messages;
+      njcr->spool_data = jcr->spool_data;
+      njcr->write_part_after_job = jcr->write_part_after_job;
+      Dmsg0(2300, "Call to run new job\n");
+      V(jq->mutex);
+      run_job(njcr);            /* This creates a "new" job */
+      free_jcr(njcr);           /* release "new" jcr */
+      P(jq->mutex);
+      Dmsg0(2300, "Back from running new job.\n");
+   }
+   return false;
+}
+
 /*
  * See if we can acquire all the necessary resources for the job (JCR)
  *
@@ -692,15 +709,10 @@ static bool acquire_resources(JCR *jcr)
    }
    if (jcr->rstore) {
       Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
-      if (jcr->rstore->NumConcurrentJobs == 0 &&
-          jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
-         /* Simple case, first job */
-         jcr->rstore->NumConcurrentJobs = 1;
-         Dmsg0(200, "Set rncj=1\n");
-      /* We can do this only if multi-drive autochanger */
-//    } else if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
-//       jcr->rstore->NumConcurrentJobs++;
-//       Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
+      if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
+         jcr->rstore->NumConcurrentReadJobs++;
+         jcr->rstore->NumConcurrentJobs++;
+         Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
       } else {
          Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
          set_jcr_job_status(jcr, JS_WaitStoreRes);
@@ -710,18 +722,11 @@ static bool acquire_resources(JCR *jcr)
    
    if (jcr->wstore) {
       Dmsg1(200, "Wstore=%s\n", jcr->wstore->name());
-      if (jcr->wstore->NumConcurrentJobs == 0 &&
-          jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
-         /* Simple case, first job */
-         jcr->wstore->NumConcurrentJobs = 1;
-         Dmsg0(200, "Set wncj=1\n");
-      } else if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
+      if (jcr->wstore->NumConcurrentJobs < jcr->wstore->MaxConcurrentJobs) {
          jcr->wstore->NumConcurrentJobs++;
          Dmsg1(200, "Inc wncj=%d\n", jcr->wstore->NumConcurrentJobs);
       } else if (jcr->rstore) {
-         jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
-         Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
-         ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
+         dec_read_store(jcr);
          skip_this_jcr = true;
       } else {
          Dmsg1(200, "Fail wncj=%d\n", jcr->wstore->NumConcurrentJobs);
@@ -737,16 +742,8 @@ static bool acquire_resources(JCR *jcr)
       jcr->client->NumConcurrentJobs++;
    } else {
       /* Back out previous locks */
-      if (jcr->wstore) {
-         jcr->wstore->NumConcurrentJobs--;
-         Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
-         ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
-      }
-      if (jcr->rstore) {
-         jcr->rstore->NumConcurrentJobs--;  
-         Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
-         ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
-      }
+      dec_write_store(jcr);
+      dec_read_store(jcr);
       set_jcr_job_status(jcr, JS_WaitClientRes);
       return false;
    }
@@ -754,16 +751,8 @@ static bool acquire_resources(JCR *jcr)
       jcr->job->NumConcurrentJobs++;
    } else {
       /* Back out previous locks */
-      if (jcr->wstore) {
-         jcr->wstore->NumConcurrentJobs--;
-         Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
-         ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
-      }
-      if (jcr->rstore) {
-         jcr->rstore->NumConcurrentJobs--;
-         Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
-         ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
-      }
+      dec_write_store(jcr);
+      dec_read_store(jcr);
       jcr->client->NumConcurrentJobs--;
       set_jcr_job_status(jcr, JS_WaitJobRes);
       return false;
@@ -772,3 +761,23 @@ static bool acquire_resources(JCR *jcr)
    jcr->acquired_resource_locks = true;
    return true;
 }
+
+static void dec_read_store(JCR *jcr)
+{
+   if (jcr->rstore) {
+      jcr->rstore->NumConcurrentReadJobs--;    /* back out rstore */
+      jcr->rstore->NumConcurrentJobs--;        /* back out rstore */
+      Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
+      ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
+      ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
+   }
+}
+
+static void dec_write_store(JCR *jcr)
+{
+   if (jcr->wstore) {
+      jcr->wstore->NumConcurrentJobs--;
+      Dmsg1(200, "Dec wncj=%d\n", jcr->wstore->NumConcurrentJobs);
+      ASSERT(jcr->wstore->NumConcurrentJobs >= 0);
+   }
+}