*
* Kern Sibbald, January MMI
*
+ * Version $Id$
+ *
* This code adapted from "Programming with POSIX Threads", by
* David R. Butenhof
*
*
*/
/*
- Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
+ Copyright (C) 2000-2003 Kern Sibbald and John Walker
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
* Returns: 0 on success
* errno on failure
*/
-int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
+int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
{
int stat;
/*
* Add work to a queue
+ * wq is a queue that was created with workq_init
+ * element is a user unique item that will be passed to the
+ * processing routine
+ * work_item will get internal work queue item -- if it is not NULL
+ * priority if non-zero will cause the item to be placed on the
+ * head of the list instead of the tail.
*/
-int workq_add(workq_t *wq, void *element)
+int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
{
int stat;
workq_ele_t *item;
return EINVAL;
}
- if ((item = (workq_ele_t *) malloc(sizeof(workq_ele_t))) == NULL) {
+ if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
return ENOMEM;
}
item->data = element;
}
Dmsg0(200, "add item to queue\n");
- /* Add the new item to the end of the queue */
- if (wq->first == NULL) {
- wq->first = item;
+ if (priority) {
+ /* Add to head of queue */
+ if (wq->first == NULL) {
+ wq->first = item;
+ wq->last = item;
+ } else {
+ item->next = wq->first;
+ wq->first = item;
+ }
} else {
- wq->last->next = item;
+ /* Add to end of queue */
+ if (wq->first == NULL) {
+ wq->first = item;
+ } else {
+ wq->last->next = item;
+ }
+ wq->last = item;
}
- wq->last = item;
/* if any threads are idle, wake one */
if (wq->idle_workers > 0) {
}
pthread_mutex_unlock(&wq->mutex);
Dmsg0(200, "Return workq_add\n");
+ /* Return work_item if requested */
+ if (work_item) {
+ *work_item = item;
+ }
+ return stat;
+}
+
+/*
+ * Remove work from a queue
+ * wq is a queue that was created with workq_init
+ * work_item is an element of work
+ *
+ * Note, it is "removed" by immediately calling a processing routine.
+ * if you want to cancel it, you need to provide some external means
+ * of doing so.
+ */
+int workq_remove(workq_t *wq, workq_ele_t *work_item)
+{
+ int stat, found = 0;
+ pthread_t id;
+ workq_ele_t *item, *prev;
+
+ Dmsg0(200, "workq_remove\n");
+ if (wq->valid != WORKQ_VALID) {
+ return EINVAL;
+ }
+
+ if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+ return stat;
+ }
+
+ for (prev=item=wq->first; item; item=item->next) {
+ if (item == work_item) {
+ found = 1;
+ break;
+ }
+ prev = item;
+ }
+ if (!found) {
+ return EINVAL;
+ }
+
+ /* Move item to be first on list */
+ if (wq->first != work_item) {
+ prev->next = work_item->next;
+ if (wq->last == work_item) {
+ wq->last = prev;
+ }
+ work_item->next = wq->first;
+ wq->first = work_item;
+ }
+
+ /* if any threads are idle, wake one */
+ if (wq->idle_workers > 0) {
+ Dmsg0(200, "Signal worker\n");
+ if ((stat = pthread_cond_signal(&wq->work)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
+ } else {
+ Dmsg0(200, "Create worker thread\n");
+ /* No idle threads so create a new one */
+ set_thread_concurrency(wq->max_workers + 1);
+ if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
+ wq->num_workers++;
+ }
+ pthread_mutex_unlock(&wq->mutex);
+ Dmsg0(200, "Return workq_remove\n");
return stat;
}
+
/*
* This is the worker thread that serves the work queue.
* In due course, it will call the user's engine.