]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/jobq.c
correct date
[bacula/bacula] / bacula / src / dird / jobq.c
index 176893911858cfc1f0cd3b2001271d525f851af4..eb4640efcd7e9c9ff55a19cc01c8316ad4fb13e6 100755 (executable)
@@ -40,7 +40,6 @@
 #include "bacula.h"
 #include "dird.h"
 
-#ifdef JOB_QUEUE
 
 /* Forward referenced functions */
 static void *jobq_server(void *arg);
@@ -140,7 +139,11 @@ struct wait_pkt {
 };
 
 /*
- * Wait until schedule time arrives before starting
+ * Wait until schedule time arrives before starting. Normally
+ *  this routine is only used for jobs started from the console
+ *  for which the user explicitly specified a start time. Otherwise
+ *  most jobs are put into the job queue only when their
+ *  scheduled time arives.
  */
 static void *sched_wait(void *arg)
 {
@@ -236,17 +239,16 @@ int jobq_add(jobq_t *jq, JCR *jcr)
       }
    }
 
+   /* Ensure that at least one server looks at the queue. */
    stat = start_server(jq);
 
-   if (stat == 0) {
-      pthread_mutex_unlock(&jq->mutex);
-   }
+   pthread_mutex_unlock(&jq->mutex);
    Dmsg0(100, "Return jobq_add\n");
    return stat;
 }
 
 /*
- *  Remove a job from the job queue
+ *  Remove a job from the job queue. Used only by cancel Console command.
  *    jq is a queue that was created with jobq_init
  *    work_item is an element of work
  *
@@ -284,9 +286,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
    jq->ready_jobs->prepend(item);
    
    stat = start_server(jq);
-   if (stat != 0) {
-      return stat;
-   }
+
    pthread_mutex_unlock(&jq->mutex);
    Dmsg0(100, "Return jobq_remove\n");
    return stat;
@@ -294,7 +294,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
 
 
 /*
- * Start the server thread 
+ * Start the server thread if it isn't already running
  */
 static int start_server(jobq_t *jq)
 {
@@ -305,7 +305,6 @@ static int start_server(jobq_t *jq)
    if (jq->idle_workers > 0) {
       Dmsg0(100, "Signal worker to wake up\n");
       if ((stat = pthread_cond_signal(&jq->work)) != 0) {
-        pthread_mutex_unlock(&jq->mutex);
         return stat;
       }
    } else if (jq->num_workers < jq->max_workers) {
@@ -313,10 +312,8 @@ static int start_server(jobq_t *jq)
       /* No idle threads so create a new one */
       set_thread_concurrency(jq->max_workers + 1);
       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
-        pthread_mutex_unlock(&jq->mutex);
         return stat;
       }
-      jq->num_workers++;
    }
    return stat;
 }
@@ -340,6 +337,7 @@ static void *jobq_server(void *arg)
    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
       return NULL;
    }
+   jq->num_workers++;
 
    for (;;) {
       struct timeval tv;
@@ -382,12 +380,15 @@ static void *jobq_server(void *arg)
         if (!jq->ready_jobs->empty()) {
             Dmsg0(100, "ready queue not empty start server\n");
            if (start_server(jq) != 0) {
+              jq->num_workers--;
+              pthread_mutex_unlock(&jq->mutex);
               return NULL;
            }
         }
         jq->running_jobs->append(je);
          Dmsg1(100, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
         if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
+           jq->num_workers--;
            return NULL;
         }
          /* Call user's routine here */
@@ -395,6 +396,7 @@ static void *jobq_server(void *arg)
         jq->engine(je->jcr);
          Dmsg1(100, "Back from user engine jobid=%d.\n", jcr->JobId);
         if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+           jq->num_workers--;
            free(je);                 /* release job entry */
            return NULL;
         }
@@ -433,7 +435,7 @@ static void *jobq_server(void *arg)
               jobq_add(jq, jcr);     /* queue the job to run again */
               P(jq->mutex);
               free(je);              /* free the job entry */
-              continue;
+              continue;              /* look for another job to run */
            }
            /* 
             * Something was actually backed up, so we cannot reuse
@@ -451,7 +453,7 @@ static void *jobq_server(void *arg)
            njcr->messages = jcr->messages;
             Dmsg0(100, "Call to run new job\n");
            V(jq->mutex);
-           run_job(njcr);
+            run_job(njcr);            /* This creates a "new" job */
            P(jq->mutex);
             Dmsg0(100, "Back from running new job.\n");
         }
@@ -482,9 +484,11 @@ static void *jobq_server(void *arg)
             Dmsg1(100, "Set Job pri=%d\n", Priority);
         }
         /*
-         * Acquire locks
+         * Walk down the list of waiting jobs and attempt
+         *   to acquire the resources it needs.
          */
         for ( ; je;  ) {
+           /* je is current job item on the queue, jn is the next one */
            JCR *jcr = je->jcr;
            jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
             Dmsg3(100, "Examining Job=%d JobPri=%d want Pri=%d\n",
@@ -518,6 +522,9 @@ static void *jobq_server(void *arg)
               je = jn;
               continue;
            }
+           /* Got all locks, now remove it from wait queue and append it
+            *   to the ready queue  
+            */
            jcr->acquired_resource_locks = true;
            jq->waiting_jobs->remove(je);
            jq->ready_jobs->append(je);
@@ -559,5 +566,3 @@ static void *jobq_server(void *arg)
    Dmsg0(100, "End jobq_server\n");
    return NULL;
 }
-
-#endif /* JOB_QUEUE */