X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Flib%2Fworkq.c;h=dbe0001ffff78551386943c16b80cbad422a56da;hb=ed89319bc77d781ff5bebba3fd48593aab15ef53;hp=b076aca7da3daedce34667427a4ce736ab232750;hpb=e8ffddd9ef03a90976b37011932ea17e1a7833de;p=bacula%2Fbacula diff --git a/bacula/src/lib/workq.c b/bacula/src/lib/workq.c index b076aca7da..dbe0001fff 100755 --- a/bacula/src/lib/workq.c +++ b/bacula/src/lib/workq.c @@ -58,7 +58,7 @@ static void *workq_server(void *arg); * Returns: 0 on success * errno on failure */ -int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) +int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg)) { int stat; @@ -136,8 +136,14 @@ int workq_destroy(workq_t *wq) /* * Add work to a queue + * wq is a queue that was created with workq_init + * element is a user unique item that will be passed to the + * processing routine + * work_item will get internal work queue item -- if it is not NULL + * priority if non-zero will cause the item to be placed on the + * head of the list instead of the tail. */ -int workq_add(workq_t *wq, void *element) +int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) { int stat; workq_ele_t *item; @@ -148,7 +154,7 @@ int workq_add(workq_t *wq, void *element) return EINVAL; } - if ((item = (workq_ele_t *) malloc(sizeof(workq_ele_t))) == NULL) { + if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) { return ENOMEM; } item->data = element; @@ -159,13 +165,24 @@ int workq_add(workq_t *wq, void *element) } Dmsg0(200, "add item to queue\n"); - /* Add the new item to the end of the queue */ - if (wq->first == NULL) { - wq->first = item; + if (priority) { + /* Add to head of queue */ + if (wq->first == NULL) { + wq->first = item; + wq->last = item; + } else { + item->next = wq->first; + wq->first = item; + } } else { - wq->last->next = item; + /* Add to end of queue */ + if (wq->first == NULL) { + wq->first = item; + } else { + wq->last->next = item; + } + wq->last = item; } - wq->last = item; /* if any threads are idle, wake one */ if (wq->idle_workers > 0) { @@ -186,9 +203,81 @@ int workq_add(workq_t *wq, void *element) } pthread_mutex_unlock(&wq->mutex); Dmsg0(200, "Return workq_add\n"); + /* Return work_item if requested */ + if (work_item) { + *work_item = item; + } return stat; } +/* + * Remove work from a queue + * wq is a queue that was created with workq_init + * work_item is an element of work + * + * Note, it is "removed" by immediately calling a processing routine. + * if you want to cancel it, you need to provide some external means + * of doing so. + */ +int workq_remove(workq_t *wq, workq_ele_t *work_item) +{ + int stat, found = 0; + pthread_t id; + workq_ele_t *item, *prev; + + Dmsg0(200, "workq_remove\n"); + if (wq->valid != WORKQ_VALID) { + return EINVAL; + } + + if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { + return stat; + } + + for (prev=item=wq->first; item; item=item->next) { + if (item == work_item) { + found = 1; + break; + } + prev = item; + } + if (!found) { + return EINVAL; + } + + /* Move item to be first on list */ + if (wq->first != work_item) { + prev->next = work_item->next; + if (wq->last == work_item) { + wq->last = prev; + } + work_item->next = wq->first; + wq->first = work_item; + } + + /* if any threads are idle, wake one */ + if (wq->idle_workers > 0) { + Dmsg0(200, "Signal worker\n"); + if ((stat = pthread_cond_signal(&wq->work)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } + } else { + Dmsg0(200, "Create worker thread\n"); + /* No idle threads so create a new one */ + set_thread_concurrency(wq->max_workers + 1); + if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } + wq->num_workers++; + } + pthread_mutex_unlock(&wq->mutex); + Dmsg0(200, "Return workq_remove\n"); + return stat; +} + + /* * This is the worker thread that serves the work queue. * In due course, it will call the user's engine.