]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/lib/workq.c
Final changes
[bacula/bacula] / bacula / src / lib / workq.c
index 3ccc663923a54d565280251d839472df2dffb25e..dbe0001ffff78551386943c16b80cbad422a56da 100755 (executable)
@@ -56,12 +56,12 @@ static void *workq_server(void *arg);
  * Initialize a work queue
  *
  *  Returns: 0 on success
- *           errno on failure
+ *          errno on failure
  */
-int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
+int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
 {
    int stat;
-                        
+                       
    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
       return stat;
    }
@@ -80,10 +80,10 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
    }
    wq->quit = 0;
    wq->first = wq->last = NULL;
-   wq->max_workers = threads;         /* max threads to create */
-   wq->num_workers = 0;               /* no threads yet */
-   wq->idle_workers = 0;              /* no idle threads */
-   wq->engine = engine;               /* routine to run */
+   wq->max_workers = threads;        /* max threads to create */
+   wq->num_workers = 0;              /* no threads yet */
+   wq->idle_workers = 0;             /* no idle threads */
+   wq->engine = engine;              /* routine to run */
    wq->valid = WORKQ_VALID; 
    return 0;
 }
@@ -92,7 +92,7 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
  * Destroy a work queue
  *
  * Returns: 0 on success
- *          errno on failure
+ *         errno on failure
  */
 int workq_destroy(workq_t *wq)
 {
@@ -104,7 +104,7 @@ int workq_destroy(workq_t *wq)
   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
      return stat;
   }
-  wq->valid = 0;                      /* prevent any more operations */
+  wq->valid = 0;                     /* prevent any more operations */
 
   /* 
    * If any threads are active, wake them 
@@ -112,22 +112,22 @@ int workq_destroy(workq_t *wq)
   if (wq->num_workers > 0) {
      wq->quit = 1;
      if (wq->idle_workers) {
-        if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
-           pthread_mutex_unlock(&wq->mutex);
-           return stat;
-        }
+       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+          pthread_mutex_unlock(&wq->mutex);
+          return stat;
+       }
      }
      while (wq->num_workers > 0) {
-        if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
-           pthread_mutex_unlock(&wq->mutex);
-           return stat;
-        }
+       if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
+          pthread_mutex_unlock(&wq->mutex);
+          return stat;
+       }
      }
   }
   if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
      return stat;
   }
-  stat  = pthread_mutex_destroy(&wq->mutex);
+  stat = pthread_mutex_destroy(&wq->mutex);
   stat1 = pthread_cond_destroy(&wq->work);
   stat2 = pthread_attr_destroy(&wq->attr);
   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
@@ -138,10 +138,10 @@ int workq_destroy(workq_t *wq)
  *  Add work to a queue
  *    wq is a queue that was created with workq_init
  *    element is a user unique item that will be passed to the 
- *        processing routine
+ *       processing routine
  *    work_item will get internal work queue item -- if it is not NULL
  *    priority if non-zero will cause the item to be placed on the
- *        head of the list instead of the tail.
+ *       head of the list instead of the tail.
  */
 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
 {
@@ -168,18 +168,18 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
    if (priority) {
       /* Add to head of queue */
       if (wq->first == NULL) {
-         wq->first = item;
-         wq->last = item;
+        wq->first = item;
+        wq->last = item;
       } else {
-         item->next = wq->first;
-         wq->first = item;
+        item->next = wq->first;
+        wq->first = item;
       }
    } else {
       /* Add to end of queue */
       if (wq->first == NULL) {
-         wq->first = item;
+        wq->first = item;
       } else {
-         wq->last->next = item;
+        wq->last->next = item;
       }
       wq->last = item;
    }
@@ -188,16 +188,16 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
    if (wq->idle_workers > 0) {
       Dmsg0(200, "Signal worker\n");
       if ((stat = pthread_cond_signal(&wq->work)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
-         return stat;
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
       }
    } else if (wq->num_workers < wq->max_workers) {
       Dmsg0(200, "Create worker thread\n");
       /* No idle threads so create a new one */
       set_thread_concurrency(wq->max_workers + 1);
       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
-         return stat;
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
       }
       wq->num_workers++;
    }
@@ -236,8 +236,8 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item)
 
    for (prev=item=wq->first; item; item=item->next) {
       if (item == work_item) {
-         found = 1;
-         break;
+        found = 1;
+        break;
       }
       prev = item;
    }
@@ -249,7 +249,7 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item)
    if (wq->first != work_item) {
       prev->next = work_item->next;   
       if (wq->last == work_item) {
-         wq->last = prev;
+        wq->last = prev;
       }
       work_item->next = wq->first;
       wq->first = work_item;
@@ -259,16 +259,16 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item)
    if (wq->idle_workers > 0) {
       Dmsg0(200, "Signal worker\n");
       if ((stat = pthread_cond_signal(&wq->work)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
-         return stat;
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
       }
    } else {
       Dmsg0(200, "Create worker thread\n");
       /* No idle threads so create a new one */
       set_thread_concurrency(wq->max_workers + 1);
       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
-         pthread_mutex_unlock(&wq->mutex);
-         return stat;
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
       }
       wq->num_workers++;
    }
@@ -306,66 +306,66 @@ static void *workq_server(void *arg)
       timeout.tv_sec = tv.tv_sec + 2;
 
       while (wq->first == NULL && !wq->quit) {
-         /*
-          * Wait 2 seconds, then if no more work, exit
-          */
+        /*
+         * Wait 2 seconds, then if no more work, exit
+         */
          Dmsg0(200, "pthread_cond_timedwait()\n");
 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
-         /* CYGWIN dies with a page fault the second
-          * time that pthread_cond_timedwait() is called
-          * so fake it out.
-          */
-         pthread_mutex_lock(&wq->mutex);
-         stat = ETIMEDOUT;
+        /* CYGWIN dies with a page fault the second
+         * time that pthread_cond_timedwait() is called
+         * so fake it out.
+         */
+        pthread_mutex_lock(&wq->mutex);
+        stat = ETIMEDOUT;
 #else
-         stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
+        stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
 #endif
          Dmsg1(200, "timedwait=%d\n", stat);
-         if (stat == ETIMEDOUT) {
-            timedout = 1;
-            break;
-         } else if (stat != 0) {
+        if (stat == ETIMEDOUT) {
+           timedout = 1;
+           break;
+        } else if (stat != 0) {
             /* This shouldn't happen */
             Dmsg0(200, "This shouldn't happen\n");
-            wq->num_workers--;
-            pthread_mutex_unlock(&wq->mutex);
-            return NULL;
-         }
+           wq->num_workers--;
+           pthread_mutex_unlock(&wq->mutex);
+           return NULL;
+        }
       } 
       we = wq->first;
       if (we != NULL) {
-         wq->first = we->next;
-         if (wq->last == we) {
-            wq->last = NULL;
-         }
-         if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
-            return NULL;
-         }
+        wq->first = we->next;
+        if (wq->last == we) {
+           wq->last = NULL;
+        }
+        if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
+           return NULL;
+        }
          /* Call user's routine here */
          Dmsg0(200, "Calling user engine.\n");
-         wq->engine(we->data);
+        wq->engine(we->data);
          Dmsg0(200, "Back from user engine.\n");
-         free(we);                    /* release work entry */
+        free(we);                    /* release work entry */
          Dmsg0(200, "relock mutex\n"); 
-         if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
-            return NULL;
-         }
+        if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+           return NULL;
+        }
          Dmsg0(200, "Done lock mutex\n");
       }
       /*
        * If no more work request, and we are asked to quit, then do it
        */
       if (wq->first == NULL && wq->quit) {
-         wq->num_workers--;
-         if (wq->num_workers == 0) {
+        wq->num_workers--;
+        if (wq->num_workers == 0) {
             Dmsg0(200, "Wake up destroy routine\n");
-            /* Wake up destroy routine if he is waiting */
-            pthread_cond_broadcast(&wq->work);
-         }
+           /* Wake up destroy routine if he is waiting */
+           pthread_cond_broadcast(&wq->work);
+        }
          Dmsg0(200, "Unlock mutex\n");
-         pthread_mutex_unlock(&wq->mutex);
+        pthread_mutex_unlock(&wq->mutex);
          Dmsg0(200, "Return from workq_server\n");
-         return NULL;
+        return NULL;
       }
       Dmsg0(200, "Check for work request\n");
       /* 
@@ -375,8 +375,8 @@ static void *workq_server(void *arg)
       Dmsg1(200, "timedout=%d\n", timedout);
       if (wq->first == NULL && timedout) {
          Dmsg0(200, "break big loop\n");
-         wq->num_workers--;
-         break;
+        wq->num_workers--;
+        break;
       }
       Dmsg0(200, "Loop again\n");
    } /* end of big for loop */