2 * Bacula work queue routines. Permits passing work to
5 * Kern Sibbald, January MMI
9 * This code adapted from "Programming with POSIX Threads", by
14 * static workq_t job_wq; define work queue
17 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
18 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
21 * Add an item to the queue
22 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
23 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno));
27 * workq_destroy(workq_t *wq);
31 Copyright (C) 2000-2004 Kern Sibbald and John Walker
33 This program is free software; you can redistribute it and/or
34 modify it under the terms of the GNU General Public License as
35 published by the Free Software Foundation; either version 2 of
36 the License, or (at your option) any later version.
38 This program is distributed in the hope that it will be useful,
39 but WITHOUT ANY WARRANTY; without even the implied warranty of
40 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
41 General Public License for more details.
43 You should have received a copy of the GNU General Public
44 License along with this program; if not, write to the Free
45 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
52 /* Forward referenced functions */
53 extern "C" void *workq_server(void *arg);
56 * Initialize a work queue
58 * Returns: 0 on success
61 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
65 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
68 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69 pthread_attr_destroy(&wq->attr);
72 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
73 pthread_attr_destroy(&wq->attr);
76 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
77 pthread_mutex_destroy(&wq->mutex);
78 pthread_attr_destroy(&wq->attr);
82 wq->first = wq->last = NULL;
83 wq->max_workers = threads; /* max threads to create */
84 wq->num_workers = 0; /* no threads yet */
85 wq->idle_workers = 0; /* no idle threads */
86 wq->engine = engine; /* routine to run */
87 wq->valid = WORKQ_VALID;
92 * Destroy a work queue
94 * Returns: 0 on success
97 int workq_destroy(workq_t *wq)
99 int stat, stat1, stat2;
101 if (wq->valid != WORKQ_VALID) {
104 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
107 wq->valid = 0; /* prevent any more operations */
110 * If any threads are active, wake them
112 if (wq->num_workers > 0) {
114 if (wq->idle_workers) {
115 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
116 pthread_mutex_unlock(&wq->mutex);
120 while (wq->num_workers > 0) {
121 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
122 pthread_mutex_unlock(&wq->mutex);
127 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
130 stat = pthread_mutex_destroy(&wq->mutex);
131 stat1 = pthread_cond_destroy(&wq->work);
132 stat2 = pthread_attr_destroy(&wq->attr);
133 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
138 * Add work to a queue
139 * wq is a queue that was created with workq_init
140 * element is a user unique item that will be passed to the
142 * work_item will get internal work queue item -- if it is not NULL
143 * priority if non-zero will cause the item to be placed on the
144 * head of the list instead of the tail.
146 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
152 Dmsg0(1400, "workq_add\n");
153 if (wq->valid != WORKQ_VALID) {
157 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
160 item->data = element;
162 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
167 Dmsg0(1400, "add item to queue\n");
169 /* Add to head of queue */
170 if (wq->first == NULL) {
174 item->next = wq->first;
178 /* Add to end of queue */
179 if (wq->first == NULL) {
182 wq->last->next = item;
187 /* if any threads are idle, wake one */
188 if (wq->idle_workers > 0) {
189 Dmsg0(1400, "Signal worker\n");
190 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
191 pthread_mutex_unlock(&wq->mutex);
194 } else if (wq->num_workers < wq->max_workers) {
195 Dmsg0(1400, "Create worker thread\n");
196 /* No idle threads so create a new one */
197 set_thread_concurrency(wq->max_workers + 1);
198 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
199 pthread_mutex_unlock(&wq->mutex);
204 pthread_mutex_unlock(&wq->mutex);
205 Dmsg0(1400, "Return workq_add\n");
206 /* Return work_item if requested */
214 * Remove work from a queue
215 * wq is a queue that was created with workq_init
216 * work_item is an element of work
218 * Note, it is "removed" by immediately calling a processing routine.
219 * if you want to cancel it, you need to provide some external means
222 int workq_remove(workq_t *wq, workq_ele_t *work_item)
226 workq_ele_t *item, *prev;
228 Dmsg0(1400, "workq_remove\n");
229 if (wq->valid != WORKQ_VALID) {
233 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
237 for (prev=item=wq->first; item; item=item->next) {
238 if (item == work_item) {
248 /* Move item to be first on list */
249 if (wq->first != work_item) {
250 prev->next = work_item->next;
251 if (wq->last == work_item) {
254 work_item->next = wq->first;
255 wq->first = work_item;
258 /* if any threads are idle, wake one */
259 if (wq->idle_workers > 0) {
260 Dmsg0(1400, "Signal worker\n");
261 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
262 pthread_mutex_unlock(&wq->mutex);
266 Dmsg0(1400, "Create worker thread\n");
267 /* No idle threads so create a new one */
268 set_thread_concurrency(wq->max_workers + 1);
269 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
270 pthread_mutex_unlock(&wq->mutex);
275 pthread_mutex_unlock(&wq->mutex);
276 Dmsg0(1400, "Return workq_remove\n");
282 * This is the worker thread that serves the work queue.
283 * In due course, it will call the user's engine.
286 void *workq_server(void *arg)
288 struct timespec timeout;
289 workq_t *wq = (workq_t *)arg;
293 Dmsg0(1400, "Start workq_server\n");
294 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
302 Dmsg0(1400, "Top of for loop\n");
304 Dmsg0(1400, "gettimeofday()\n");
305 gettimeofday(&tv, &tz);
307 timeout.tv_sec = tv.tv_sec + 2;
309 while (wq->first == NULL && !wq->quit) {
311 * Wait 2 seconds, then if no more work, exit
313 Dmsg0(1400, "pthread_cond_timedwait()\n");
314 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
315 /* CYGWIN dies with a page fault the second
316 * time that pthread_cond_timedwait() is called
319 pthread_mutex_lock(&wq->mutex);
322 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
324 Dmsg1(1400, "timedwait=%d\n", stat);
325 if (stat == ETIMEDOUT) {
328 } else if (stat != 0) {
329 /* This shouldn't happen */
330 Dmsg0(1400, "This shouldn't happen\n");
332 pthread_mutex_unlock(&wq->mutex);
338 wq->first = we->next;
339 if (wq->last == we) {
342 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
345 /* Call user's routine here */
346 Dmsg0(1400, "Calling user engine.\n");
347 wq->engine(we->data);
348 Dmsg0(1400, "Back from user engine.\n");
349 free(we); /* release work entry */
350 Dmsg0(1400, "relock mutex\n");
351 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
354 Dmsg0(1400, "Done lock mutex\n");
357 * If no more work request, and we are asked to quit, then do it
359 if (wq->first == NULL && wq->quit) {
361 if (wq->num_workers == 0) {
362 Dmsg0(1400, "Wake up destroy routine\n");
363 /* Wake up destroy routine if he is waiting */
364 pthread_cond_broadcast(&wq->work);
366 Dmsg0(1400, "Unlock mutex\n");
367 pthread_mutex_unlock(&wq->mutex);
368 Dmsg0(1400, "Return from workq_server\n");
371 Dmsg0(1400, "Check for work request\n");
373 * If no more work requests, and we waited long enough, quit
375 Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
376 Dmsg1(1400, "timedout=%d\n", timedout);
377 if (wq->first == NULL && timedout) {
378 Dmsg0(1400, "break big loop\n");
382 Dmsg0(1400, "Loop again\n");
383 } /* end of big for loop */
385 Dmsg0(1400, "unlock mutex\n");
386 pthread_mutex_unlock(&wq->mutex);
387 Dmsg0(1400, "End workq_server\n");