]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/lib/workq.c
Final changes
[bacula/bacula] / bacula / src / lib / workq.c
index b076aca7da3daedce34667427a4ce736ab232750..dbe0001ffff78551386943c16b80cbad422a56da 100755 (executable)
@@ -58,7 +58,7 @@ static void *workq_server(void *arg);
  *  Returns: 0 on success
  *          errno on failure
  */
-int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
+int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
 {
    int stat;
                        
@@ -136,8 +136,14 @@ int workq_destroy(workq_t *wq)
 
 /*
  *  Add work to a queue
+ *    wq is a queue that was created with workq_init
+ *    element is a user unique item that will be passed to the 
+ *       processing routine
+ *    work_item will get internal work queue item -- if it is not NULL
+ *    priority if non-zero will cause the item to be placed on the
+ *       head of the list instead of the tail.
  */
-int workq_add(workq_t *wq, void *element)
+int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
 {
    int stat;
    workq_ele_t *item;
@@ -148,7 +154,7 @@ int workq_add(workq_t *wq, void *element)
       return EINVAL;
    }
 
-   if ((item = (workq_ele_t *) malloc(sizeof(workq_ele_t))) == NULL) {
+   if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
       return ENOMEM;
    }
    item->data = element;
@@ -159,13 +165,24 @@ int workq_add(workq_t *wq, void *element)
    }
 
    Dmsg0(200, "add item to queue\n");
-   /* Add the new item to the end of the queue */
-   if (wq->first == NULL) {
-      wq->first = item;
+   if (priority) {
+      /* Add to head of queue */
+      if (wq->first == NULL) {
+        wq->first = item;
+        wq->last = item;
+      } else {
+        item->next = wq->first;
+        wq->first = item;
+      }
    } else {
-      wq->last->next = item;
+      /* Add to end of queue */
+      if (wq->first == NULL) {
+        wq->first = item;
+      } else {
+        wq->last->next = item;
+      }
+      wq->last = item;
    }
-   wq->last = item;
 
    /* if any threads are idle, wake one */
    if (wq->idle_workers > 0) {
@@ -186,9 +203,81 @@ int workq_add(workq_t *wq, void *element)
    }
    pthread_mutex_unlock(&wq->mutex);
    Dmsg0(200, "Return workq_add\n");
+   /* Return work_item if requested */
+   if (work_item) {
+      *work_item = item;
+   }
    return stat;
 }
 
+/*
+ *  Remove work from a queue
+ *    wq is a queue that was created with workq_init
+ *    work_item is an element of work
+ *
+ *   Note, it is "removed" by immediately calling a processing routine.
+ *    if you want to cancel it, you need to provide some external means
+ *    of doing so.
+ */
+int workq_remove(workq_t *wq, workq_ele_t *work_item)
+{
+   int stat, found = 0;
+   pthread_t id;
+   workq_ele_t *item, *prev;
+    
+   Dmsg0(200, "workq_remove\n");
+   if (wq->valid != WORKQ_VALID) {
+      return EINVAL;
+   }
+
+   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+      return stat;
+   }
+
+   for (prev=item=wq->first; item; item=item->next) {
+      if (item == work_item) {
+        found = 1;
+        break;
+      }
+      prev = item;
+   }
+   if (!found) {
+      return EINVAL;
+   }
+   
+   /* Move item to be first on list */
+   if (wq->first != work_item) {
+      prev->next = work_item->next;   
+      if (wq->last == work_item) {
+        wq->last = prev;
+      }
+      work_item->next = wq->first;
+      wq->first = work_item;
+   }
+
+   /* if any threads are idle, wake one */
+   if (wq->idle_workers > 0) {
+      Dmsg0(200, "Signal worker\n");
+      if ((stat = pthread_cond_signal(&wq->work)) != 0) {
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
+      }
+   } else {
+      Dmsg0(200, "Create worker thread\n");
+      /* No idle threads so create a new one */
+      set_thread_concurrency(wq->max_workers + 1);
+      if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
+        pthread_mutex_unlock(&wq->mutex);
+        return stat;
+      }
+      wq->num_workers++;
+   }
+   pthread_mutex_unlock(&wq->mutex);
+   Dmsg0(200, "Return workq_remove\n");
+   return stat;
+}
+
+
 /* 
  * This is the worker thread that serves the work queue.
  * In due course, it will call the user's engine.