2 * Bacula work queue routines. Permits passing work to
5 * Kern Sibbald, January MMI
9 * This code adapted from "Programming with POSIX Threads", by
14 * static workq_t job_wq; define work queue
17 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
18 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
21 * Add an item to the queue
22 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
23 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno));
27 * workq_destroy(workq_t *wq);
31 Copyright (C) 2000-2003 Kern Sibbald and John Walker
33 This program is free software; you can redistribute it and/or
34 modify it under the terms of the GNU General Public License as
35 published by the Free Software Foundation; either version 2 of
36 the License, or (at your option) any later version.
38 This program is distributed in the hope that it will be useful,
39 but WITHOUT ANY WARRANTY; without even the implied warranty of
40 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
41 General Public License for more details.
43 You should have received a copy of the GNU General Public
44 License along with this program; if not, write to the Free
45 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
52 /* Forward referenced functions */
53 static void *workq_server(void *arg);
56 * Initialize a work queue
58 * Returns: 0 on success
61 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
65 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
68 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69 pthread_attr_destroy(&wq->attr);
72 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
73 pthread_attr_destroy(&wq->attr);
76 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
77 pthread_mutex_destroy(&wq->mutex);
78 pthread_attr_destroy(&wq->attr);
82 wq->first = wq->last = NULL;
83 wq->max_workers = threads; /* max threads to create */
84 wq->num_workers = 0; /* no threads yet */
85 wq->idle_workers = 0; /* no idle threads */
86 wq->engine = engine; /* routine to run */
87 wq->valid = WORKQ_VALID;
92 * Destroy a work queue
94 * Returns: 0 on success
97 int workq_destroy(workq_t *wq)
99 int stat, stat1, stat2;
101 if (wq->valid != WORKQ_VALID) {
104 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
107 wq->valid = 0; /* prevent any more operations */
110 * If any threads are active, wake them
112 if (wq->num_workers > 0) {
114 if (wq->idle_workers) {
115 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
116 pthread_mutex_unlock(&wq->mutex);
120 while (wq->num_workers > 0) {
121 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
122 pthread_mutex_unlock(&wq->mutex);
127 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
130 stat = pthread_mutex_destroy(&wq->mutex);
131 stat1 = pthread_cond_destroy(&wq->work);
132 stat2 = pthread_attr_destroy(&wq->attr);
133 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
138 * Add work to a queue
139 * wq is a queue that was created with workq_init
140 * element is a user unique item that will be passed to the
142 * work_item will get internal work queue item -- if it is not NULL
143 * priority if non-zero will cause the item to be placed on the
144 * head of the list instead of the tail.
146 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
152 Dmsg0(200, "workq_add\n");
153 if (wq->valid != WORKQ_VALID) {
157 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
160 item->data = element;
162 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
167 Dmsg0(200, "add item to queue\n");
169 /* Add to head of queue */
170 if (wq->first == NULL) {
174 item->next = wq->first;
178 /* Add to end of queue */
179 if (wq->first == NULL) {
182 wq->last->next = item;
187 /* if any threads are idle, wake one */
188 if (wq->idle_workers > 0) {
189 Dmsg0(200, "Signal worker\n");
190 if ((stat = pthread_cond_signal(&wq->work)) != 0) {
191 pthread_mutex_unlock(&wq->mutex);
194 } else if (wq->num_workers < wq->max_workers) {
195 Dmsg0(200, "Create worker thread\n");
196 /* No idle threads so create a new one */
197 set_thread_concurrency(wq->max_workers + 1);
198 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
199 pthread_mutex_unlock(&wq->mutex);
204 pthread_mutex_unlock(&wq->mutex);
205 Dmsg0(200, "Return workq_add\n");
206 /* Return work_item if requested */
214 * Remove work from a queue
215 * wq is a queue that was created with workq_init
216 * work_item is an element of work
218 * Note, it is "removed" by immediately calling a processing routine.
219 * if you want to cancel it, you need to provide some external means
222 int workq_remove(workq_t *wq, workq_ele_t *work_item)
226 workq_ele_t *item, *prev;
228 Dmsg0(200, "workq_remove\n");
229 if (wq->valid != WORKQ_VALID) {
233 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
237 for (prev=item=wq->first; item; item=item->next) {
238 if (item == work_item) {
248 /* Move item to be first on list */
249 if (wq->first != work_item) {
250 prev->next = work_item->next;
251 if (wq->last == work_item) {
254 work_item->next = wq->first;
255 wq->first = work_item;
258 /* if any threads are idle, wake one */
259 if (wq->idle_workers > 0) {
260 Dmsg0(200, "Signal worker\n");
261 if ((stat = pthread_cond_signal(&wq->work)) != 0) {
262 pthread_mutex_unlock(&wq->mutex);
266 Dmsg0(200, "Create worker thread\n");
267 /* No idle threads so create a new one */
268 set_thread_concurrency(wq->max_workers + 1);
269 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
270 pthread_mutex_unlock(&wq->mutex);
275 pthread_mutex_unlock(&wq->mutex);
276 Dmsg0(200, "Return workq_remove\n");
282 * This is the worker thread that serves the work queue.
283 * In due course, it will call the user's engine.
285 static void *workq_server(void *arg)
287 struct timespec timeout;
288 workq_t *wq = (workq_t *)arg;
292 Dmsg0(200, "Start workq_server\n");
293 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
301 Dmsg0(200, "Top of for loop\n");
303 Dmsg0(200, "gettimeofday()\n");
304 gettimeofday(&tv, &tz);
306 timeout.tv_sec = tv.tv_sec + 2;
308 while (wq->first == NULL && !wq->quit) {
310 * Wait 2 seconds, then if no more work, exit
312 Dmsg0(200, "pthread_cond_timedwait()\n");
313 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
314 /* CYGWIN dies with a page fault the second
315 * time that pthread_cond_timedwait() is called
318 pthread_mutex_lock(&wq->mutex);
321 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
323 Dmsg1(200, "timedwait=%d\n", stat);
324 if (stat == ETIMEDOUT) {
327 } else if (stat != 0) {
328 /* This shouldn't happen */
329 Dmsg0(200, "This shouldn't happen\n");
331 pthread_mutex_unlock(&wq->mutex);
337 wq->first = we->next;
338 if (wq->last == we) {
341 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
344 /* Call user's routine here */
345 Dmsg0(200, "Calling user engine.\n");
346 wq->engine(we->data);
347 Dmsg0(200, "Back from user engine.\n");
348 free(we); /* release work entry */
349 Dmsg0(200, "relock mutex\n");
350 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
353 Dmsg0(200, "Done lock mutex\n");
356 * If no more work request, and we are asked to quit, then do it
358 if (wq->first == NULL && wq->quit) {
360 if (wq->num_workers == 0) {
361 Dmsg0(200, "Wake up destroy routine\n");
362 /* Wake up destroy routine if he is waiting */
363 pthread_cond_broadcast(&wq->work);
365 Dmsg0(200, "Unlock mutex\n");
366 pthread_mutex_unlock(&wq->mutex);
367 Dmsg0(200, "Return from workq_server\n");
370 Dmsg0(200, "Check for work request\n");
372 * If no more work requests, and we waited long enough, quit
374 Dmsg1(200, "wq->first==NULL = %d\n", wq->first==NULL);
375 Dmsg1(200, "timedout=%d\n", timedout);
376 if (wq->first == NULL && timedout) {
377 Dmsg0(200, "break big loop\n");
381 Dmsg0(200, "Loop again\n");
382 } /* end of big for loop */
384 Dmsg0(200, "unlock mutex\n");
385 pthread_mutex_unlock(&wq->mutex);
386 Dmsg0(200, "End workq_server\n");