2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula work queue routines. Permits passing work to
32 * Kern Sibbald, January MMI
36 * This code adapted from "Programming with POSIX Threads", by
41 * static workq_t job_wq; define work queue
44 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
46 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
49 * Add an item to the queue
50 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
52 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
56 * workq_destroy(workq_t *wq);
63 /* Forward referenced functions */
64 extern "C" void *workq_server(void *arg);
67 * Initialize a work queue
69 * Returns: 0 on success
72 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
76 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
79 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
80 pthread_attr_destroy(&wq->attr);
83 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
84 pthread_attr_destroy(&wq->attr);
87 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
88 pthread_mutex_destroy(&wq->mutex);
89 pthread_attr_destroy(&wq->attr);
93 wq->first = wq->last = NULL;
94 wq->max_workers = threads; /* max threads to create */
95 wq->num_workers = 0; /* no threads yet */
96 wq->idle_workers = 0; /* no idle threads */
97 wq->engine = engine; /* routine to run */
98 wq->valid = WORKQ_VALID;
103 * Destroy a work queue
105 * Returns: 0 on success
108 int workq_destroy(workq_t *wq)
110 int stat, stat1, stat2;
112 if (wq->valid != WORKQ_VALID) {
115 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
118 wq->valid = 0; /* prevent any more operations */
121 * If any threads are active, wake them
123 if (wq->num_workers > 0) {
125 if (wq->idle_workers) {
126 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
127 pthread_mutex_unlock(&wq->mutex);
131 while (wq->num_workers > 0) {
132 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
133 pthread_mutex_unlock(&wq->mutex);
138 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
141 stat = pthread_mutex_destroy(&wq->mutex);
142 stat1 = pthread_cond_destroy(&wq->work);
143 stat2 = pthread_attr_destroy(&wq->attr);
144 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
149 * Add work to a queue
150 * wq is a queue that was created with workq_init
151 * element is a user unique item that will be passed to the
153 * work_item will get internal work queue item -- if it is not NULL
154 * priority if non-zero will cause the item to be placed on the
155 * head of the list instead of the tail.
157 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
163 Dmsg0(1400, "workq_add\n");
164 if (wq->valid != WORKQ_VALID) {
168 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
171 item->data = element;
173 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
178 Dmsg0(1400, "add item to queue\n");
180 /* Add to head of queue */
181 if (wq->first == NULL) {
185 item->next = wq->first;
189 /* Add to end of queue */
190 if (wq->first == NULL) {
193 wq->last->next = item;
198 /* if any threads are idle, wake one */
199 if (wq->idle_workers > 0) {
200 Dmsg0(1400, "Signal worker\n");
201 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
202 pthread_mutex_unlock(&wq->mutex);
205 } else if (wq->num_workers < wq->max_workers) {
206 Dmsg0(1400, "Create worker thread\n");
207 /* No idle threads so create a new one */
208 set_thread_concurrency(wq->max_workers + 1);
209 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
210 pthread_mutex_unlock(&wq->mutex);
215 pthread_mutex_unlock(&wq->mutex);
216 Dmsg0(1400, "Return workq_add\n");
217 /* Return work_item if requested */
225 * Remove work from a queue
226 * wq is a queue that was created with workq_init
227 * work_item is an element of work
229 * Note, it is "removed" by immediately calling a processing routine.
230 * if you want to cancel it, you need to provide some external means
233 int workq_remove(workq_t *wq, workq_ele_t *work_item)
237 workq_ele_t *item, *prev;
239 Dmsg0(1400, "workq_remove\n");
240 if (wq->valid != WORKQ_VALID) {
244 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
248 for (prev=item=wq->first; item; item=item->next) {
249 if (item == work_item) {
259 /* Move item to be first on list */
260 if (wq->first != work_item) {
261 prev->next = work_item->next;
262 if (wq->last == work_item) {
265 work_item->next = wq->first;
266 wq->first = work_item;
269 /* if any threads are idle, wake one */
270 if (wq->idle_workers > 0) {
271 Dmsg0(1400, "Signal worker\n");
272 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
273 pthread_mutex_unlock(&wq->mutex);
277 Dmsg0(1400, "Create worker thread\n");
278 /* No idle threads so create a new one */
279 set_thread_concurrency(wq->max_workers + 1);
280 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
281 pthread_mutex_unlock(&wq->mutex);
286 pthread_mutex_unlock(&wq->mutex);
287 Dmsg0(1400, "Return workq_remove\n");
293 * This is the worker thread that serves the work queue.
294 * In due course, it will call the user's engine.
297 void *workq_server(void *arg)
299 struct timespec timeout;
300 workq_t *wq = (workq_t *)arg;
304 Dmsg0(1400, "Start workq_server\n");
305 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
308 set_jcr_in_tsd(INVALID_JCR);
314 Dmsg0(1400, "Top of for loop\n");
316 Dmsg0(1400, "gettimeofday()\n");
317 gettimeofday(&tv, &tz);
319 timeout.tv_sec = tv.tv_sec + 2;
321 while (wq->first == NULL && !wq->quit) {
323 * Wait 2 seconds, then if no more work, exit
325 Dmsg0(1400, "pthread_cond_timedwait()\n");
326 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
327 /* CYGWIN dies with a page fault the second
328 * time that pthread_cond_timedwait() is called
331 pthread_mutex_lock(&wq->mutex);
334 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
336 Dmsg1(1400, "timedwait=%d\n", stat);
337 if (stat == ETIMEDOUT) {
340 } else if (stat != 0) {
341 /* This shouldn't happen */
342 Dmsg0(1400, "This shouldn't happen\n");
344 pthread_mutex_unlock(&wq->mutex);
350 wq->first = we->next;
351 if (wq->last == we) {
354 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
357 /* Call user's routine here */
358 Dmsg0(1400, "Calling user engine.\n");
359 wq->engine(we->data);
360 Dmsg0(1400, "Back from user engine.\n");
361 free(we); /* release work entry */
362 Dmsg0(1400, "relock mutex\n");
363 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
366 Dmsg0(1400, "Done lock mutex\n");
369 * If no more work request, and we are asked to quit, then do it
371 if (wq->first == NULL && wq->quit) {
373 if (wq->num_workers == 0) {
374 Dmsg0(1400, "Wake up destroy routine\n");
375 /* Wake up destroy routine if he is waiting */
376 pthread_cond_broadcast(&wq->work);
378 Dmsg0(1400, "Unlock mutex\n");
379 pthread_mutex_unlock(&wq->mutex);
380 Dmsg0(1400, "Return from workq_server\n");
383 Dmsg0(1400, "Check for work request\n");
385 * If no more work requests, and we waited long enough, quit
387 Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
388 Dmsg1(1400, "timedout=%d\n", timedout);
389 if (wq->first == NULL && timedout) {
390 Dmsg0(1400, "break big loop\n");
394 Dmsg0(1400, "Loop again\n");
395 } /* end of big for loop */
397 Dmsg0(1400, "unlock mutex\n");
398 pthread_mutex_unlock(&wq->mutex);
399 Dmsg0(1400, "End workq_server\n");