/* Forward referenced functions */
extern "C" void *workq_server(void *arg);
-/*
+/*
* Initialize a work queue
*
* Returns: 0 on success
int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
{
int stat;
-
+
if ((stat = pthread_attr_init(&wq->attr)) != 0) {
return stat;
}
wq->num_workers = 0; /* no threads yet */
wq->idle_workers = 0; /* no idle threads */
wq->engine = engine; /* routine to run */
- wq->valid = WORKQ_VALID;
+ wq->valid = WORKQ_VALID;
return 0;
}
}
wq->valid = 0; /* prevent any more operations */
- /*
- * If any threads are active, wake them
+ /*
+ * If any threads are active, wake them
*/
if (wq->num_workers > 0) {
wq->quit = 1;
/*
* Add work to a queue
* wq is a queue that was created with workq_init
- * element is a user unique item that will be passed to the
+ * element is a user unique item that will be passed to the
* processing routine
* work_item will get internal work queue item -- if it is not NULL
* priority if non-zero will cause the item to be placed on the
int stat;
workq_ele_t *item;
pthread_t id;
-
- Dmsg0(400, "workq_add\n");
+
+ Dmsg0(1400, "workq_add\n");
if (wq->valid != WORKQ_VALID) {
return EINVAL;
}
return stat;
}
- Dmsg0(400, "add item to queue\n");
+ Dmsg0(1400, "add item to queue\n");
if (priority) {
/* Add to head of queue */
if (wq->first == NULL) {
/* if any threads are idle, wake one */
if (wq->idle_workers > 0) {
- Dmsg0(400, "Signal worker\n");
+ Dmsg0(1400, "Signal worker\n");
if ((stat = pthread_cond_signal(&wq->work)) != 0) {
pthread_mutex_unlock(&wq->mutex);
return stat;
}
} else if (wq->num_workers < wq->max_workers) {
- Dmsg0(400, "Create worker thread\n");
+ Dmsg0(1400, "Create worker thread\n");
/* No idle threads so create a new one */
set_thread_concurrency(wq->max_workers + 1);
if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
wq->num_workers++;
}
pthread_mutex_unlock(&wq->mutex);
- Dmsg0(400, "Return workq_add\n");
+ Dmsg0(1400, "Return workq_add\n");
/* Return work_item if requested */
if (work_item) {
*work_item = item;
int stat, found = 0;
pthread_t id;
workq_ele_t *item, *prev;
-
- Dmsg0(400, "workq_remove\n");
+
+ Dmsg0(1400, "workq_remove\n");
if (wq->valid != WORKQ_VALID) {
return EINVAL;
}
if (!found) {
return EINVAL;
}
-
+
/* Move item to be first on list */
if (wq->first != work_item) {
- prev->next = work_item->next;
+ prev->next = work_item->next;
if (wq->last == work_item) {
wq->last = prev;
}
/* if any threads are idle, wake one */
if (wq->idle_workers > 0) {
- Dmsg0(400, "Signal worker\n");
+ Dmsg0(1400, "Signal worker\n");
if ((stat = pthread_cond_signal(&wq->work)) != 0) {
pthread_mutex_unlock(&wq->mutex);
return stat;
}
} else {
- Dmsg0(400, "Create worker thread\n");
+ Dmsg0(1400, "Create worker thread\n");
/* No idle threads so create a new one */
set_thread_concurrency(wq->max_workers + 1);
if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
wq->num_workers++;
}
pthread_mutex_unlock(&wq->mutex);
- Dmsg0(400, "Return workq_remove\n");
+ Dmsg0(1400, "Return workq_remove\n");
return stat;
}
-/*
+/*
* This is the worker thread that serves the work queue.
* In due course, it will call the user's engine.
*/
workq_ele_t *we;
int stat, timedout;
- Dmsg0(400, "Start workq_server\n");
+ Dmsg0(1400, "Start workq_server\n");
if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
return NULL;
}
struct timeval tv;
struct timezone tz;
- Dmsg0(400, "Top of for loop\n");
+ Dmsg0(1400, "Top of for loop\n");
timedout = 0;
- Dmsg0(400, "gettimeofday()\n");
+ Dmsg0(1400, "gettimeofday()\n");
gettimeofday(&tv, &tz);
timeout.tv_nsec = 0;
timeout.tv_sec = tv.tv_sec + 2;
/*
* Wait 2 seconds, then if no more work, exit
*/
- Dmsg0(400, "pthread_cond_timedwait()\n");
+ Dmsg0(1400, "pthread_cond_timedwait()\n");
#ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
/* CYGWIN dies with a page fault the second
* time that pthread_cond_timedwait() is called
#else
stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
#endif
- Dmsg1(400, "timedwait=%d\n", stat);
+ Dmsg1(1400, "timedwait=%d\n", stat);
if (stat == ETIMEDOUT) {
timedout = 1;
break;
} else if (stat != 0) {
/* This shouldn't happen */
- Dmsg0(400, "This shouldn't happen\n");
+ Dmsg0(1400, "This shouldn't happen\n");
wq->num_workers--;
pthread_mutex_unlock(&wq->mutex);
return NULL;
}
- }
+ }
we = wq->first;
if (we != NULL) {
wq->first = we->next;
return NULL;
}
/* Call user's routine here */
- Dmsg0(400, "Calling user engine.\n");
+ Dmsg0(1400, "Calling user engine.\n");
wq->engine(we->data);
- Dmsg0(400, "Back from user engine.\n");
+ Dmsg0(1400, "Back from user engine.\n");
free(we); /* release work entry */
- Dmsg0(400, "relock mutex\n");
+ Dmsg0(1400, "relock mutex\n");
if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
return NULL;
}
- Dmsg0(400, "Done lock mutex\n");
+ Dmsg0(1400, "Done lock mutex\n");
}
/*
* If no more work request, and we are asked to quit, then do it
if (wq->first == NULL && wq->quit) {
wq->num_workers--;
if (wq->num_workers == 0) {
- Dmsg0(400, "Wake up destroy routine\n");
+ Dmsg0(1400, "Wake up destroy routine\n");
/* Wake up destroy routine if he is waiting */
pthread_cond_broadcast(&wq->work);
}
- Dmsg0(400, "Unlock mutex\n");
+ Dmsg0(1400, "Unlock mutex\n");
pthread_mutex_unlock(&wq->mutex);
- Dmsg0(400, "Return from workq_server\n");
+ Dmsg0(1400, "Return from workq_server\n");
return NULL;
}
- Dmsg0(400, "Check for work request\n");
- /*
+ Dmsg0(1400, "Check for work request\n");
+ /*
* If no more work requests, and we waited long enough, quit
*/
- Dmsg1(400, "wq->first==NULL = %d\n", wq->first==NULL);
- Dmsg1(400, "timedout=%d\n", timedout);
+ Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
+ Dmsg1(1400, "timedout=%d\n", timedout);
if (wq->first == NULL && timedout) {
- Dmsg0(400, "break big loop\n");
+ Dmsg0(1400, "break big loop\n");
wq->num_workers--;
break;
}
- Dmsg0(400, "Loop again\n");
+ Dmsg0(1400, "Loop again\n");
} /* end of big for loop */
- Dmsg0(400, "unlock mutex\n");
+ Dmsg0(1400, "unlock mutex\n");
pthread_mutex_unlock(&wq->mutex);
- Dmsg0(400, "End workq_server\n");
+ Dmsg0(1400, "End workq_server\n");
return NULL;
}