2 * Bacula work queue routines. Permits passing work to
5 * Kern Sibbald, January MMI
9 * This code adapted from "Programming with POSIX Threads", by
14 * static workq_t job_wq; define work queue
17 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
18 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
21 * Add an item to the queue
22 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
23 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno));
27 * workq_destroy(workq_t *wq);
31 Bacula® - The Network Backup Solution
33 Copyright (C) 2001-2006 Free Software Foundation Europe e.V.
35 The main author of Bacula is Kern Sibbald, with contributions from
36 many others, a complete list can be found in the file AUTHORS.
37 This program is Free Software; you can redistribute it and/or
38 modify it under the terms of version two of the GNU General Public
39 License as published by the Free Software Foundation plus additions
40 that are listed in the file LICENSE.
42 This program is distributed in the hope that it will be useful, but
43 WITHOUT ANY WARRANTY; without even the implied warranty of
44 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
45 General Public License for more details.
47 You should have received a copy of the GNU General Public License
48 along with this program; if not, write to the Free Software
49 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
52 Bacula® is a registered trademark of John Walker.
53 The licensor of Bacula is the Free Software Foundation Europe
54 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
55 Switzerland, email:ftf@fsfeurope.org.
60 /* Forward referenced functions */
61 extern "C" void *workq_server(void *arg);
64 * Initialize a work queue
66 * Returns: 0 on success
69 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
73 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
76 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
77 pthread_attr_destroy(&wq->attr);
80 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
81 pthread_attr_destroy(&wq->attr);
84 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
85 pthread_mutex_destroy(&wq->mutex);
86 pthread_attr_destroy(&wq->attr);
90 wq->first = wq->last = NULL;
91 wq->max_workers = threads; /* max threads to create */
92 wq->num_workers = 0; /* no threads yet */
93 wq->idle_workers = 0; /* no idle threads */
94 wq->engine = engine; /* routine to run */
95 wq->valid = WORKQ_VALID;
100 * Destroy a work queue
102 * Returns: 0 on success
105 int workq_destroy(workq_t *wq)
107 int stat, stat1, stat2;
109 if (wq->valid != WORKQ_VALID) {
112 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
115 wq->valid = 0; /* prevent any more operations */
118 * If any threads are active, wake them
120 if (wq->num_workers > 0) {
122 if (wq->idle_workers) {
123 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
124 pthread_mutex_unlock(&wq->mutex);
128 while (wq->num_workers > 0) {
129 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
130 pthread_mutex_unlock(&wq->mutex);
135 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
138 stat = pthread_mutex_destroy(&wq->mutex);
139 stat1 = pthread_cond_destroy(&wq->work);
140 stat2 = pthread_attr_destroy(&wq->attr);
141 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
146 * Add work to a queue
147 * wq is a queue that was created with workq_init
148 * element is a user unique item that will be passed to the
150 * work_item will get internal work queue item -- if it is not NULL
151 * priority if non-zero will cause the item to be placed on the
152 * head of the list instead of the tail.
154 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
160 Dmsg0(1400, "workq_add\n");
161 if (wq->valid != WORKQ_VALID) {
165 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
168 item->data = element;
170 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
175 Dmsg0(1400, "add item to queue\n");
177 /* Add to head of queue */
178 if (wq->first == NULL) {
182 item->next = wq->first;
186 /* Add to end of queue */
187 if (wq->first == NULL) {
190 wq->last->next = item;
195 /* if any threads are idle, wake one */
196 if (wq->idle_workers > 0) {
197 Dmsg0(1400, "Signal worker\n");
198 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
199 pthread_mutex_unlock(&wq->mutex);
202 } else if (wq->num_workers < wq->max_workers) {
203 Dmsg0(1400, "Create worker thread\n");
204 /* No idle threads so create a new one */
205 set_thread_concurrency(wq->max_workers + 1);
206 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
207 pthread_mutex_unlock(&wq->mutex);
212 pthread_mutex_unlock(&wq->mutex);
213 Dmsg0(1400, "Return workq_add\n");
214 /* Return work_item if requested */
222 * Remove work from a queue
223 * wq is a queue that was created with workq_init
224 * work_item is an element of work
226 * Note, it is "removed" by immediately calling a processing routine.
227 * if you want to cancel it, you need to provide some external means
230 int workq_remove(workq_t *wq, workq_ele_t *work_item)
234 workq_ele_t *item, *prev;
236 Dmsg0(1400, "workq_remove\n");
237 if (wq->valid != WORKQ_VALID) {
241 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
245 for (prev=item=wq->first; item; item=item->next) {
246 if (item == work_item) {
256 /* Move item to be first on list */
257 if (wq->first != work_item) {
258 prev->next = work_item->next;
259 if (wq->last == work_item) {
262 work_item->next = wq->first;
263 wq->first = work_item;
266 /* if any threads are idle, wake one */
267 if (wq->idle_workers > 0) {
268 Dmsg0(1400, "Signal worker\n");
269 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
270 pthread_mutex_unlock(&wq->mutex);
274 Dmsg0(1400, "Create worker thread\n");
275 /* No idle threads so create a new one */
276 set_thread_concurrency(wq->max_workers + 1);
277 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
278 pthread_mutex_unlock(&wq->mutex);
283 pthread_mutex_unlock(&wq->mutex);
284 Dmsg0(1400, "Return workq_remove\n");
290 * This is the worker thread that serves the work queue.
291 * In due course, it will call the user's engine.
294 void *workq_server(void *arg)
296 struct timespec timeout;
297 workq_t *wq = (workq_t *)arg;
301 Dmsg0(1400, "Start workq_server\n");
302 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
310 Dmsg0(1400, "Top of for loop\n");
312 Dmsg0(1400, "gettimeofday()\n");
313 gettimeofday(&tv, &tz);
315 timeout.tv_sec = tv.tv_sec + 2;
317 while (wq->first == NULL && !wq->quit) {
319 * Wait 2 seconds, then if no more work, exit
321 Dmsg0(1400, "pthread_cond_timedwait()\n");
322 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
323 /* CYGWIN dies with a page fault the second
324 * time that pthread_cond_timedwait() is called
327 pthread_mutex_lock(&wq->mutex);
330 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
332 Dmsg1(1400, "timedwait=%d\n", stat);
333 if (stat == ETIMEDOUT) {
336 } else if (stat != 0) {
337 /* This shouldn't happen */
338 Dmsg0(1400, "This shouldn't happen\n");
340 pthread_mutex_unlock(&wq->mutex);
346 wq->first = we->next;
347 if (wq->last == we) {
350 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
353 /* Call user's routine here */
354 Dmsg0(1400, "Calling user engine.\n");
355 wq->engine(we->data);
356 Dmsg0(1400, "Back from user engine.\n");
357 free(we); /* release work entry */
358 Dmsg0(1400, "relock mutex\n");
359 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
362 Dmsg0(1400, "Done lock mutex\n");
365 * If no more work request, and we are asked to quit, then do it
367 if (wq->first == NULL && wq->quit) {
369 if (wq->num_workers == 0) {
370 Dmsg0(1400, "Wake up destroy routine\n");
371 /* Wake up destroy routine if he is waiting */
372 pthread_cond_broadcast(&wq->work);
374 Dmsg0(1400, "Unlock mutex\n");
375 pthread_mutex_unlock(&wq->mutex);
376 Dmsg0(1400, "Return from workq_server\n");
379 Dmsg0(1400, "Check for work request\n");
381 * If no more work requests, and we waited long enough, quit
383 Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
384 Dmsg1(1400, "timedout=%d\n", timedout);
385 if (wq->first == NULL && timedout) {
386 Dmsg0(1400, "break big loop\n");
390 Dmsg0(1400, "Loop again\n");
391 } /* end of big for loop */
393 Dmsg0(1400, "unlock mutex\n");
394 pthread_mutex_unlock(&wq->mutex);
395 Dmsg0(1400, "End workq_server\n");