]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/jobq.c
correct date
[bacula/bacula] / bacula / src / dird / jobq.c
index a16f6b48fafddbeab40e9f7b9156d020b90b8659..eb4640efcd7e9c9ff55a19cc01c8316ad4fb13e6 100755 (executable)
@@ -40,7 +40,6 @@
 #include "bacula.h"
 #include "dird.h"
 
-#ifdef JOB_QUEUE
 
 /* Forward referenced functions */
 static void *jobq_server(void *arg);
@@ -140,7 +139,11 @@ struct wait_pkt {
 };
 
 /*
- * Wait until schedule time arrives before starting
+ * Wait until schedule time arrives before starting. Normally
+ *  this routine is only used for jobs started from the console
+ *  for which the user explicitly specified a start time. Otherwise
+ *  most jobs are put into the job queue only when their
+ *  scheduled time arives.
  */
 static void *sched_wait(void *arg)
 {
@@ -236,17 +239,16 @@ int jobq_add(jobq_t *jq, JCR *jcr)
       }
    }
 
+   /* Ensure that at least one server looks at the queue. */
    stat = start_server(jq);
 
-   if (stat == 0) {
-      pthread_mutex_unlock(&jq->mutex);
-   }
+   pthread_mutex_unlock(&jq->mutex);
    Dmsg0(100, "Return jobq_add\n");
    return stat;
 }
 
 /*
- *  Remove a job from the job queue
+ *  Remove a job from the job queue. Used only by cancel Console command.
  *    jq is a queue that was created with jobq_init
  *    work_item is an element of work
  *
@@ -284,9 +286,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
    jq->ready_jobs->prepend(item);
    
    stat = start_server(jq);
-   if (stat != 0) {
-      return stat;
-   }
+
    pthread_mutex_unlock(&jq->mutex);
    Dmsg0(100, "Return jobq_remove\n");
    return stat;
@@ -294,7 +294,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
 
 
 /*
- * Start the server thread 
+ * Start the server thread if it isn't already running
  */
 static int start_server(jobq_t *jq)
 {
@@ -305,7 +305,6 @@ static int start_server(jobq_t *jq)
    if (jq->idle_workers > 0) {
       Dmsg0(100, "Signal worker to wake up\n");
       if ((stat = pthread_cond_signal(&jq->work)) != 0) {
-        pthread_mutex_unlock(&jq->mutex);
         return stat;
       }
    } else if (jq->num_workers < jq->max_workers) {
@@ -313,10 +312,8 @@ static int start_server(jobq_t *jq)
       /* No idle threads so create a new one */
       set_thread_concurrency(jq->max_workers + 1);
       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
-        pthread_mutex_unlock(&jq->mutex);
         return stat;
       }
-      jq->num_workers++;
    }
    return stat;
 }
@@ -340,6 +337,7 @@ static void *jobq_server(void *arg)
    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
       return NULL;
    }
+   jq->num_workers++;
 
    for (;;) {
       struct timeval tv;
@@ -375,24 +373,30 @@ static void *jobq_server(void *arg)
        */
       Dmsg0(100, "Checking ready queue.\n");
       while (!jq->ready_jobs->empty() && !jq->quit) {
+        JCR *jcr;
         je = (jobq_item_t *)jq->ready_jobs->first(); 
+        jcr = je->jcr;
         jq->ready_jobs->remove(je);
         if (!jq->ready_jobs->empty()) {
             Dmsg0(100, "ready queue not empty start server\n");
            if (start_server(jq) != 0) {
+              jq->num_workers--;
+              pthread_mutex_unlock(&jq->mutex);
               return NULL;
            }
         }
         jq->running_jobs->append(je);
-         Dmsg1(100, "Took jobid=%d from ready and appended to run\n", je->jcr->JobId);
+         Dmsg1(100, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
         if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
+           jq->num_workers--;
            return NULL;
         }
          /* Call user's routine here */
-         Dmsg1(100, "Calling user engine for jobid=%d\n", je->jcr->JobId);
+         Dmsg1(100, "Calling user engine for jobid=%d\n", jcr->JobId);
         jq->engine(je->jcr);
-         Dmsg1(100, "Back from user engine jobid=%d.\n", je->jcr->JobId);
+         Dmsg1(100, "Back from user engine jobid=%d.\n", jcr->JobId);
         if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+           jq->num_workers--;
            free(je);                 /* release job entry */
            return NULL;
         }
@@ -403,12 +407,64 @@ static void *jobq_server(void *arg)
          *  been acquired for jobs canceled before they were
          *  put into the ready queue.
          */
-        if (je->jcr->acquired_resource_locks) {
-           je->jcr->store->NumConcurrentJobs--;
-           je->jcr->client->NumConcurrentJobs--;
-           je->jcr->job->NumConcurrentJobs--;
+        if (jcr->acquired_resource_locks) {
+           jcr->store->NumConcurrentJobs--;
+           jcr->client->NumConcurrentJobs--;
+           jcr->job->NumConcurrentJobs--;
+        }
+
+        if (jcr->job->RescheduleOnError && 
+            jcr->JobStatus != JS_Terminated &&
+            jcr->JobStatus != JS_Canceled && 
+            jcr->job->RescheduleTimes > 0 && 
+            jcr->reschedule_count < jcr->job->RescheduleTimes) {
+
+            /*
+             * Reschedule this job by cleaning it up, but
+             *  reuse the same JobId if possible.
+             */
+           jcr->reschedule_count++;
+           jcr->sched_time = time(NULL) + jcr->job->RescheduleInterval;
+            Dmsg2(100, "Rescheduled Job %s to re-run in %d seconds.\n", jcr->Job,
+              (int)jcr->job->RescheduleInterval);
+           jcr->JobStatus = JS_Created; /* force new status */
+           dird_free_jcr(jcr);          /* partial cleanup old stuff */
+           if (jcr->JobBytes == 0) {
+               Dmsg1(100, "Requeue job=%d\n", jcr->JobId);
+              V(jq->mutex);
+              jobq_add(jq, jcr);     /* queue the job to run again */
+              P(jq->mutex);
+              free(je);              /* free the job entry */
+              continue;              /* look for another job to run */
+           }
+           /* 
+            * Something was actually backed up, so we cannot reuse
+            *   the old JobId or there will be database record
+            *   conflicts.  We now create a new job, copying the
+            *   appropriate fields.
+            */
+           JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr);
+           set_jcr_defaults(njcr, jcr->job);
+           njcr->reschedule_count = jcr->reschedule_count;
+           njcr->JobLevel = jcr->JobLevel;
+           njcr->JobStatus = jcr->JobStatus;
+           njcr->pool = jcr->pool;
+           njcr->store = jcr->store;
+           njcr->messages = jcr->messages;
+            Dmsg0(100, "Call to run new job\n");
+           V(jq->mutex);
+            run_job(njcr);            /* This creates a "new" job */
+           P(jq->mutex);
+            Dmsg0(100, "Back from running new job.\n");
+        }
+        /* Clean up and release old jcr */
+        if (jcr->db) {
+            Dmsg0(100, "Close DB\n");
+           db_close_database(jcr, jcr->db);
+           jcr->db = NULL;
         }
-        free_jcr(je->jcr);
+         Dmsg1(100, "====== Termination job=%d\n", jcr->JobId);
+        free_jcr(jcr);
         free(je);                    /* release job entry */
       }
       /*
@@ -428,9 +484,11 @@ static void *jobq_server(void *arg)
             Dmsg1(100, "Set Job pri=%d\n", Priority);
         }
         /*
-         * Acquire locks
+         * Walk down the list of waiting jobs and attempt
+         *   to acquire the resources it needs.
          */
         for ( ; je;  ) {
+           /* je is current job item on the queue, jn is the next one */
            JCR *jcr = je->jcr;
            jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
             Dmsg3(100, "Examining Job=%d JobPri=%d want Pri=%d\n",
@@ -464,6 +522,9 @@ static void *jobq_server(void *arg)
               je = jn;
               continue;
            }
+           /* Got all locks, now remove it from wait queue and append it
+            *   to the ready queue  
+            */
            jcr->acquired_resource_locks = true;
            jq->waiting_jobs->remove(je);
            jq->ready_jobs->append(je);
@@ -505,5 +566,3 @@ static void *jobq_server(void *arg)
    Dmsg0(100, "End jobq_server\n");
    return NULL;
 }
-
-#endif /* JOB_QUEUE */