* Initialize a work queue
*
* Returns: 0 on success
- * errno on failure
+ * errno on failure
*/
int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
{
}
wq->quit = 0;
wq->first = wq->last = NULL;
- wq->max_workers = threads; /* max threads to create */
- wq->num_workers = 0; /* no threads yet */
- wq->idle_workers = 0; /* no idle threads */
- wq->engine = engine; /* routine to run */
+ wq->max_workers = threads; /* max threads to create */
+ wq->num_workers = 0; /* no threads yet */
+ wq->idle_workers = 0; /* no idle threads */
+ wq->engine = engine; /* routine to run */
wq->valid = WORKQ_VALID;
return 0;
}
* Destroy a work queue
*
* Returns: 0 on success
- * errno on failure
+ * errno on failure
*/
int workq_destroy(workq_t *wq)
{
if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
return stat;
}
- wq->valid = 0; /* prevent any more operations */
+ wq->valid = 0; /* prevent any more operations */
/*
* If any threads are active, wake them
if (wq->num_workers > 0) {
wq->quit = 1;
if (wq->idle_workers) {
- if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
- }
+ if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
}
while (wq->num_workers > 0) {
- if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
- }
+ if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
+ }
}
}
if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
return stat;
}
- stat = pthread_mutex_destroy(&wq->mutex);
+ stat = pthread_mutex_destroy(&wq->mutex);
stat1 = pthread_cond_destroy(&wq->work);
stat2 = pthread_attr_destroy(&wq->attr);
return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
* Add work to a queue
* wq is a queue that was created with workq_init
* element is a user unique item that will be passed to the
- * processing routine
+ * processing routine
* work_item will get internal work queue item -- if it is not NULL
* priority if non-zero will cause the item to be placed on the
- * head of the list instead of the tail.
+ * head of the list instead of the tail.
*/
int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
{
if (priority) {
/* Add to head of queue */
if (wq->first == NULL) {
- wq->first = item;
- wq->last = item;
+ wq->first = item;
+ wq->last = item;
} else {
- item->next = wq->first;
- wq->first = item;
+ item->next = wq->first;
+ wq->first = item;
}
} else {
/* Add to end of queue */
if (wq->first == NULL) {
- wq->first = item;
+ wq->first = item;
} else {
- wq->last->next = item;
+ wq->last->next = item;
}
wq->last = item;
}
/* if any threads are idle, wake one */
if (wq->idle_workers > 0) {
Dmsg0(1400, "Signal worker\n");
- if ((stat = pthread_cond_signal(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
} else if (wq->num_workers < wq->max_workers) {
Dmsg0(1400, "Create worker thread\n");
/* No idle threads so create a new one */
set_thread_concurrency(wq->max_workers + 1);
if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
wq->num_workers++;
}
for (prev=item=wq->first; item; item=item->next) {
if (item == work_item) {
- found = 1;
- break;
+ found = 1;
+ break;
}
prev = item;
}
if (wq->first != work_item) {
prev->next = work_item->next;
if (wq->last == work_item) {
- wq->last = prev;
+ wq->last = prev;
}
work_item->next = wq->first;
wq->first = work_item;
/* if any threads are idle, wake one */
if (wq->idle_workers > 0) {
Dmsg0(1400, "Signal worker\n");
- if ((stat = pthread_cond_signal(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
} else {
Dmsg0(1400, "Create worker thread\n");
/* No idle threads so create a new one */
set_thread_concurrency(wq->max_workers + 1);
if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
- return stat;
+ pthread_mutex_unlock(&wq->mutex);
+ return stat;
}
wq->num_workers++;
}
timeout.tv_sec = tv.tv_sec + 2;
while (wq->first == NULL && !wq->quit) {
- /*
- * Wait 2 seconds, then if no more work, exit
- */
+ /*
+ * Wait 2 seconds, then if no more work, exit
+ */
Dmsg0(1400, "pthread_cond_timedwait()\n");
#ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
- /* CYGWIN dies with a page fault the second
- * time that pthread_cond_timedwait() is called
- * so fake it out.
- */
- pthread_mutex_lock(&wq->mutex);
- stat = ETIMEDOUT;
+ /* CYGWIN dies with a page fault the second
+ * time that pthread_cond_timedwait() is called
+ * so fake it out.
+ */
+ pthread_mutex_lock(&wq->mutex);
+ stat = ETIMEDOUT;
#else
- stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
+ stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
#endif
Dmsg1(1400, "timedwait=%d\n", stat);
- if (stat == ETIMEDOUT) {
- timedout = 1;
- break;
- } else if (stat != 0) {
+ if (stat == ETIMEDOUT) {
+ timedout = 1;
+ break;
+ } else if (stat != 0) {
/* This shouldn't happen */
Dmsg0(1400, "This shouldn't happen\n");
- wq->num_workers--;
- pthread_mutex_unlock(&wq->mutex);
- return NULL;
- }
+ wq->num_workers--;
+ pthread_mutex_unlock(&wq->mutex);
+ return NULL;
+ }
}
we = wq->first;
if (we != NULL) {
- wq->first = we->next;
- if (wq->last == we) {
- wq->last = NULL;
- }
- if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
- return NULL;
- }
+ wq->first = we->next;
+ if (wq->last == we) {
+ wq->last = NULL;
+ }
+ if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
+ return NULL;
+ }
/* Call user's routine here */
Dmsg0(1400, "Calling user engine.\n");
- wq->engine(we->data);
+ wq->engine(we->data);
Dmsg0(1400, "Back from user engine.\n");
- free(we); /* release work entry */
+ free(we); /* release work entry */
Dmsg0(1400, "relock mutex\n");
- if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
- return NULL;
- }
+ if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
+ return NULL;
+ }
Dmsg0(1400, "Done lock mutex\n");
}
/*
* If no more work request, and we are asked to quit, then do it
*/
if (wq->first == NULL && wq->quit) {
- wq->num_workers--;
- if (wq->num_workers == 0) {
+ wq->num_workers--;
+ if (wq->num_workers == 0) {
Dmsg0(1400, "Wake up destroy routine\n");
- /* Wake up destroy routine if he is waiting */
- pthread_cond_broadcast(&wq->work);
- }
+ /* Wake up destroy routine if he is waiting */
+ pthread_cond_broadcast(&wq->work);
+ }
Dmsg0(1400, "Unlock mutex\n");
- pthread_mutex_unlock(&wq->mutex);
+ pthread_mutex_unlock(&wq->mutex);
Dmsg0(1400, "Return from workq_server\n");
- return NULL;
+ return NULL;
}
Dmsg0(1400, "Check for work request\n");
/*
Dmsg1(1400, "timedout=%d\n", timedout);
if (wq->first == NULL && timedout) {
Dmsg0(1400, "break big loop\n");
- wq->num_workers--;
- break;
+ wq->num_workers--;
+ break;
}
Dmsg0(1400, "Loop again\n");
} /* end of big for loop */