]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/dird/jobq.c
correct date
[bacula/bacula] / bacula / src / dird / jobq.c
index c416ee7e2bd3e78f3ccf9d06938dc3826d7bbbef..eb4640efcd7e9c9ff55a19cc01c8316ad4fb13e6 100755 (executable)
@@ -40,7 +40,6 @@
 #include "bacula.h"
 #include "dird.h"
 
-#ifdef JOB_QUEUE
 
 /* Forward referenced functions */
 static void *jobq_server(void *arg);
@@ -243,9 +242,7 @@ int jobq_add(jobq_t *jq, JCR *jcr)
    /* Ensure that at least one server looks at the queue. */
    stat = start_server(jq);
 
-   if (stat == 0) {
-      pthread_mutex_unlock(&jq->mutex);
-   }
+   pthread_mutex_unlock(&jq->mutex);
    Dmsg0(100, "Return jobq_add\n");
    return stat;
 }
@@ -289,9 +286,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr)
    jq->ready_jobs->prepend(item);
    
    stat = start_server(jq);
-   if (stat != 0) {
-      return stat;
-   }
+
    pthread_mutex_unlock(&jq->mutex);
    Dmsg0(100, "Return jobq_remove\n");
    return stat;
@@ -310,7 +305,6 @@ static int start_server(jobq_t *jq)
    if (jq->idle_workers > 0) {
       Dmsg0(100, "Signal worker to wake up\n");
       if ((stat = pthread_cond_signal(&jq->work)) != 0) {
-        pthread_mutex_unlock(&jq->mutex);
         return stat;
       }
    } else if (jq->num_workers < jq->max_workers) {
@@ -318,10 +312,8 @@ static int start_server(jobq_t *jq)
       /* No idle threads so create a new one */
       set_thread_concurrency(jq->max_workers + 1);
       if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
-        pthread_mutex_unlock(&jq->mutex);
         return stat;
       }
-      jq->num_workers++;
    }
    return stat;
 }
@@ -345,6 +337,7 @@ static void *jobq_server(void *arg)
    if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
       return NULL;
    }
+   jq->num_workers++;
 
    for (;;) {
       struct timeval tv;
@@ -387,12 +380,15 @@ static void *jobq_server(void *arg)
         if (!jq->ready_jobs->empty()) {
             Dmsg0(100, "ready queue not empty start server\n");
            if (start_server(jq) != 0) {
+              jq->num_workers--;
+              pthread_mutex_unlock(&jq->mutex);
               return NULL;
            }
         }
         jq->running_jobs->append(je);
          Dmsg1(100, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
         if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
+           jq->num_workers--;
            return NULL;
         }
          /* Call user's routine here */
@@ -400,6 +396,7 @@ static void *jobq_server(void *arg)
         jq->engine(je->jcr);
          Dmsg1(100, "Back from user engine jobid=%d.\n", jcr->JobId);
         if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+           jq->num_workers--;
            free(je);                 /* release job entry */
            return NULL;
         }
@@ -569,5 +566,3 @@ static void *jobq_server(void *arg)
    Dmsg0(100, "End jobq_server\n");
    return NULL;
 }
-
-#endif /* JOB_QUEUE */