]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/lib/workq.c
Fix typo.
[bacula/bacula] / bacula / src / lib / workq.c
index 2744d23c92e7f25a4f7ee1b7bd3f2b853c93e4a4..1f957d53043b92f444f477c6fb83283b22aa746f 100644 (file)
@@ -6,7 +6,7 @@
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version two of the GNU General Public
+   modify it under the terms of version three of the GNU Affero General Public
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
@@ -15,7 +15,7 @@
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Affero General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
@@ -58,6 +58,7 @@
  */
 
 #include "bacula.h"
+#include "jcr.h"
 
 /* Forward referenced functions */
 extern "C" void *workq_server(void *arg);
@@ -111,9 +112,7 @@ int workq_destroy(workq_t *wq)
   if (wq->valid != WORKQ_VALID) {
      return EINVAL;
   }
-  if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-     return stat;
-  }
+  P(wq->mutex);
   wq->valid = 0;                      /* prevent any more operations */
 
   /*
@@ -123,20 +122,18 @@ int workq_destroy(workq_t *wq)
      wq->quit = 1;
      if (wq->idle_workers) {
         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
-           pthread_mutex_unlock(&wq->mutex);
+           V(wq->mutex);
            return stat;
         }
      }
      while (wq->num_workers > 0) {
         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
-           pthread_mutex_unlock(&wq->mutex);
+           V(wq->mutex);
            return stat;
         }
      }
   }
-  if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
-     return stat;
-  }
+  V(wq->mutex);
   stat  = pthread_mutex_destroy(&wq->mutex);
   stat1 = pthread_cond_destroy(&wq->work);
   stat2 = pthread_attr_destroy(&wq->attr);
@@ -155,7 +152,7 @@ int workq_destroy(workq_t *wq)
  */
 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
 {
-   int stat;
+   int stat=0;
    workq_ele_t *item;
    pthread_t id;
 
@@ -169,10 +166,7 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
    }
    item->data = element;
    item->next = NULL;
-   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-      free(item);
-      return stat;
-   }
+   P(wq->mutex);
 
    Dmsg0(1400, "add item to queue\n");
    if (priority) {
@@ -198,7 +192,7 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
    if (wq->idle_workers > 0) {
       Dmsg0(1400, "Signal worker\n");
       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
+         V(wq->mutex);
          return stat;
       }
    } else if (wq->num_workers < wq->max_workers) {
@@ -206,12 +200,12 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
       /* No idle threads so create a new one */
       set_thread_concurrency(wq->max_workers + 1);
       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
+         V(wq->mutex);
          return stat;
       }
       wq->num_workers++;
    }
-   pthread_mutex_unlock(&wq->mutex);
+   V(wq->mutex);
    Dmsg0(1400, "Return workq_add\n");
    /* Return work_item if requested */
    if (work_item) {
@@ -240,9 +234,7 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item)
       return EINVAL;
    }
 
-   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-      return stat;
-   }
+   P(wq->mutex);
 
    for (prev=item=wq->first; item; item=item->next) {
       if (item == work_item) {
@@ -269,7 +261,7 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item)
    if (wq->idle_workers > 0) {
       Dmsg0(1400, "Signal worker\n");
       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
+         V(wq->mutex);
          return stat;
       }
    } else {
@@ -277,12 +269,12 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item)
       /* No idle threads so create a new one */
       set_thread_concurrency(wq->max_workers + 1);
       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
+         V(wq->mutex);
          return stat;
       }
       wq->num_workers++;
    }
-   pthread_mutex_unlock(&wq->mutex);
+   V(wq->mutex);
    Dmsg0(1400, "Return workq_remove\n");
    return stat;
 }
@@ -301,9 +293,8 @@ void *workq_server(void *arg)
    int stat, timedout;
 
    Dmsg0(1400, "Start workq_server\n");
-   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-      return NULL;
-   }
+   P(wq->mutex);
+   set_jcr_in_tsd(INVALID_JCR);
 
    for (;;) {
       struct timeval tv;
@@ -326,7 +317,7 @@ void *workq_server(void *arg)
           * time that pthread_cond_timedwait() is called
           * so fake it out.
           */
-         pthread_mutex_lock(&wq->mutex);
+         P(wq->mutex);
          stat = ETIMEDOUT;
 #else
          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
@@ -339,7 +330,7 @@ void *workq_server(void *arg)
             /* This shouldn't happen */
             Dmsg0(1400, "This shouldn't happen\n");
             wq->num_workers--;
-            pthread_mutex_unlock(&wq->mutex);
+            V(wq->mutex);
             return NULL;
          }
       }
@@ -349,18 +340,14 @@ void *workq_server(void *arg)
          if (wq->last == we) {
             wq->last = NULL;
          }
-         if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
-            return NULL;
-         }
+         V(wq->mutex);
          /* Call user's routine here */
          Dmsg0(1400, "Calling user engine.\n");
          wq->engine(we->data);
          Dmsg0(1400, "Back from user engine.\n");
          free(we);                    /* release work entry */
          Dmsg0(1400, "relock mutex\n");
-         if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-            return NULL;
-         }
+         P(wq->mutex);
          Dmsg0(1400, "Done lock mutex\n");
       }
       /*
@@ -374,7 +361,7 @@ void *workq_server(void *arg)
             pthread_cond_broadcast(&wq->work);
          }
          Dmsg0(1400, "Unlock mutex\n");
-         pthread_mutex_unlock(&wq->mutex);
+         V(wq->mutex);
          Dmsg0(1400, "Return from workq_server\n");
          return NULL;
       }
@@ -393,7 +380,7 @@ void *workq_server(void *arg)
    } /* end of big for loop */
 
    Dmsg0(1400, "unlock mutex\n");
-   pthread_mutex_unlock(&wq->mutex);
+   V(wq->mutex);
    Dmsg0(1400, "End workq_server\n");
    return NULL;
 }