2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of John Walker.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula work queue routines. Permits passing work to
32 * Kern Sibbald, January MMI
36 * This code adapted from "Programming with POSIX Threads", by
41 * static workq_t job_wq; define work queue
44 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
46 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
49 * Add an item to the queue
50 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
52 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
56 * workq_destroy(workq_t *wq);
62 /* Forward referenced functions */
63 extern "C" void *workq_server(void *arg);
66 * Initialize a work queue
68 * Returns: 0 on success
71 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
75 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
78 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
79 pthread_attr_destroy(&wq->attr);
82 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
83 pthread_attr_destroy(&wq->attr);
86 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
87 pthread_mutex_destroy(&wq->mutex);
88 pthread_attr_destroy(&wq->attr);
92 wq->first = wq->last = NULL;
93 wq->max_workers = threads; /* max threads to create */
94 wq->num_workers = 0; /* no threads yet */
95 wq->idle_workers = 0; /* no idle threads */
96 wq->engine = engine; /* routine to run */
97 wq->valid = WORKQ_VALID;
102 * Destroy a work queue
104 * Returns: 0 on success
107 int workq_destroy(workq_t *wq)
109 int stat, stat1, stat2;
111 if (wq->valid != WORKQ_VALID) {
114 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
117 wq->valid = 0; /* prevent any more operations */
120 * If any threads are active, wake them
122 if (wq->num_workers > 0) {
124 if (wq->idle_workers) {
125 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
126 pthread_mutex_unlock(&wq->mutex);
130 while (wq->num_workers > 0) {
131 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
132 pthread_mutex_unlock(&wq->mutex);
137 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
140 stat = pthread_mutex_destroy(&wq->mutex);
141 stat1 = pthread_cond_destroy(&wq->work);
142 stat2 = pthread_attr_destroy(&wq->attr);
143 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
148 * Add work to a queue
149 * wq is a queue that was created with workq_init
150 * element is a user unique item that will be passed to the
152 * work_item will get internal work queue item -- if it is not NULL
153 * priority if non-zero will cause the item to be placed on the
154 * head of the list instead of the tail.
156 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
162 Dmsg0(1400, "workq_add\n");
163 if (wq->valid != WORKQ_VALID) {
167 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
170 item->data = element;
172 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
177 Dmsg0(1400, "add item to queue\n");
179 /* Add to head of queue */
180 if (wq->first == NULL) {
184 item->next = wq->first;
188 /* Add to end of queue */
189 if (wq->first == NULL) {
192 wq->last->next = item;
197 /* if any threads are idle, wake one */
198 if (wq->idle_workers > 0) {
199 Dmsg0(1400, "Signal worker\n");
200 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
201 pthread_mutex_unlock(&wq->mutex);
204 } else if (wq->num_workers < wq->max_workers) {
205 Dmsg0(1400, "Create worker thread\n");
206 /* No idle threads so create a new one */
207 set_thread_concurrency(wq->max_workers + 1);
208 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
209 pthread_mutex_unlock(&wq->mutex);
214 pthread_mutex_unlock(&wq->mutex);
215 Dmsg0(1400, "Return workq_add\n");
216 /* Return work_item if requested */
224 * Remove work from a queue
225 * wq is a queue that was created with workq_init
226 * work_item is an element of work
228 * Note, it is "removed" by immediately calling a processing routine.
229 * if you want to cancel it, you need to provide some external means
232 int workq_remove(workq_t *wq, workq_ele_t *work_item)
236 workq_ele_t *item, *prev;
238 Dmsg0(1400, "workq_remove\n");
239 if (wq->valid != WORKQ_VALID) {
243 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
247 for (prev=item=wq->first; item; item=item->next) {
248 if (item == work_item) {
258 /* Move item to be first on list */
259 if (wq->first != work_item) {
260 prev->next = work_item->next;
261 if (wq->last == work_item) {
264 work_item->next = wq->first;
265 wq->first = work_item;
268 /* if any threads are idle, wake one */
269 if (wq->idle_workers > 0) {
270 Dmsg0(1400, "Signal worker\n");
271 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
272 pthread_mutex_unlock(&wq->mutex);
276 Dmsg0(1400, "Create worker thread\n");
277 /* No idle threads so create a new one */
278 set_thread_concurrency(wq->max_workers + 1);
279 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
280 pthread_mutex_unlock(&wq->mutex);
285 pthread_mutex_unlock(&wq->mutex);
286 Dmsg0(1400, "Return workq_remove\n");
292 * This is the worker thread that serves the work queue.
293 * In due course, it will call the user's engine.
296 void *workq_server(void *arg)
298 struct timespec timeout;
299 workq_t *wq = (workq_t *)arg;
303 Dmsg0(1400, "Start workq_server\n");
304 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
312 Dmsg0(1400, "Top of for loop\n");
314 Dmsg0(1400, "gettimeofday()\n");
315 gettimeofday(&tv, &tz);
317 timeout.tv_sec = tv.tv_sec + 2;
319 while (wq->first == NULL && !wq->quit) {
321 * Wait 2 seconds, then if no more work, exit
323 Dmsg0(1400, "pthread_cond_timedwait()\n");
324 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
325 /* CYGWIN dies with a page fault the second
326 * time that pthread_cond_timedwait() is called
329 pthread_mutex_lock(&wq->mutex);
332 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
334 Dmsg1(1400, "timedwait=%d\n", stat);
335 if (stat == ETIMEDOUT) {
338 } else if (stat != 0) {
339 /* This shouldn't happen */
340 Dmsg0(1400, "This shouldn't happen\n");
342 pthread_mutex_unlock(&wq->mutex);
348 wq->first = we->next;
349 if (wq->last == we) {
352 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
355 /* Call user's routine here */
356 Dmsg0(1400, "Calling user engine.\n");
357 wq->engine(we->data);
358 Dmsg0(1400, "Back from user engine.\n");
359 free(we); /* release work entry */
360 Dmsg0(1400, "relock mutex\n");
361 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
364 Dmsg0(1400, "Done lock mutex\n");
367 * If no more work request, and we are asked to quit, then do it
369 if (wq->first == NULL && wq->quit) {
371 if (wq->num_workers == 0) {
372 Dmsg0(1400, "Wake up destroy routine\n");
373 /* Wake up destroy routine if he is waiting */
374 pthread_cond_broadcast(&wq->work);
376 Dmsg0(1400, "Unlock mutex\n");
377 pthread_mutex_unlock(&wq->mutex);
378 Dmsg0(1400, "Return from workq_server\n");
381 Dmsg0(1400, "Check for work request\n");
383 * If no more work requests, and we waited long enough, quit
385 Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
386 Dmsg1(1400, "timedout=%d\n", timedout);
387 if (wq->first == NULL && timedout) {
388 Dmsg0(1400, "break big loop\n");
392 Dmsg0(1400, "Loop again\n");
393 } /* end of big for loop */
395 Dmsg0(1400, "unlock mutex\n");
396 pthread_mutex_unlock(&wq->mutex);
397 Dmsg0(1400, "End workq_server\n");