2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula work queue routines. Permits passing work to
32 * Kern Sibbald, January MMI
36 * This code adapted from "Programming with POSIX Threads", by
41 * static workq_t job_wq; define work queue
44 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
46 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
49 * Add an item to the queue
50 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
52 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
56 * workq_destroy(workq_t *wq);
63 /* Forward referenced functions */
64 extern "C" void *workq_server(void *arg);
67 * Initialize a work queue
69 * Returns: 0 on success
72 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
76 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
79 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
80 pthread_attr_destroy(&wq->attr);
83 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
84 pthread_attr_destroy(&wq->attr);
87 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
88 pthread_mutex_destroy(&wq->mutex);
89 pthread_attr_destroy(&wq->attr);
93 wq->first = wq->last = NULL;
94 wq->max_workers = threads; /* max threads to create */
95 wq->num_workers = 0; /* no threads yet */
96 wq->idle_workers = 0; /* no idle threads */
97 wq->engine = engine; /* routine to run */
98 wq->valid = WORKQ_VALID;
103 * Destroy a work queue
105 * Returns: 0 on success
108 int workq_destroy(workq_t *wq)
110 int stat, stat1, stat2;
112 if (wq->valid != WORKQ_VALID) {
116 wq->valid = 0; /* prevent any more operations */
119 * If any threads are active, wake them
121 if (wq->num_workers > 0) {
123 if (wq->idle_workers) {
124 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
129 while (wq->num_workers > 0) {
130 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
137 stat = pthread_mutex_destroy(&wq->mutex);
138 stat1 = pthread_cond_destroy(&wq->work);
139 stat2 = pthread_attr_destroy(&wq->attr);
140 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
145 * Add work to a queue
146 * wq is a queue that was created with workq_init
147 * element is a user unique item that will be passed to the
149 * work_item will get internal work queue item -- if it is not NULL
150 * priority if non-zero will cause the item to be placed on the
151 * head of the list instead of the tail.
153 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
159 Dmsg0(1400, "workq_add\n");
160 if (wq->valid != WORKQ_VALID) {
164 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
167 item->data = element;
171 Dmsg0(1400, "add item to queue\n");
173 /* Add to head of queue */
174 if (wq->first == NULL) {
178 item->next = wq->first;
182 /* Add to end of queue */
183 if (wq->first == NULL) {
186 wq->last->next = item;
191 /* if any threads are idle, wake one */
192 if (wq->idle_workers > 0) {
193 Dmsg0(1400, "Signal worker\n");
194 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
198 } else if (wq->num_workers < wq->max_workers) {
199 Dmsg0(1400, "Create worker thread\n");
200 /* No idle threads so create a new one */
201 set_thread_concurrency(wq->max_workers + 1);
202 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
209 Dmsg0(1400, "Return workq_add\n");
210 /* Return work_item if requested */
218 * Remove work from a queue
219 * wq is a queue that was created with workq_init
220 * work_item is an element of work
222 * Note, it is "removed" by immediately calling a processing routine.
223 * if you want to cancel it, you need to provide some external means
226 int workq_remove(workq_t *wq, workq_ele_t *work_item)
230 workq_ele_t *item, *prev;
232 Dmsg0(1400, "workq_remove\n");
233 if (wq->valid != WORKQ_VALID) {
239 for (prev=item=wq->first; item; item=item->next) {
240 if (item == work_item) {
250 /* Move item to be first on list */
251 if (wq->first != work_item) {
252 prev->next = work_item->next;
253 if (wq->last == work_item) {
256 work_item->next = wq->first;
257 wq->first = work_item;
260 /* if any threads are idle, wake one */
261 if (wq->idle_workers > 0) {
262 Dmsg0(1400, "Signal worker\n");
263 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
268 Dmsg0(1400, "Create worker thread\n");
269 /* No idle threads so create a new one */
270 set_thread_concurrency(wq->max_workers + 1);
271 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
278 Dmsg0(1400, "Return workq_remove\n");
284 * This is the worker thread that serves the work queue.
285 * In due course, it will call the user's engine.
288 void *workq_server(void *arg)
290 struct timespec timeout;
291 workq_t *wq = (workq_t *)arg;
295 Dmsg0(1400, "Start workq_server\n");
297 set_jcr_in_tsd(INVALID_JCR);
303 Dmsg0(1400, "Top of for loop\n");
305 Dmsg0(1400, "gettimeofday()\n");
306 gettimeofday(&tv, &tz);
308 timeout.tv_sec = tv.tv_sec + 2;
310 while (wq->first == NULL && !wq->quit) {
312 * Wait 2 seconds, then if no more work, exit
314 Dmsg0(1400, "pthread_cond_timedwait()\n");
315 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
316 /* CYGWIN dies with a page fault the second
317 * time that pthread_cond_timedwait() is called
323 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
325 Dmsg1(1400, "timedwait=%d\n", stat);
326 if (stat == ETIMEDOUT) {
329 } else if (stat != 0) {
330 /* This shouldn't happen */
331 Dmsg0(1400, "This shouldn't happen\n");
339 wq->first = we->next;
340 if (wq->last == we) {
344 /* Call user's routine here */
345 Dmsg0(1400, "Calling user engine.\n");
346 wq->engine(we->data);
347 Dmsg0(1400, "Back from user engine.\n");
348 free(we); /* release work entry */
349 Dmsg0(1400, "relock mutex\n");
351 Dmsg0(1400, "Done lock mutex\n");
354 * If no more work request, and we are asked to quit, then do it
356 if (wq->first == NULL && wq->quit) {
358 if (wq->num_workers == 0) {
359 Dmsg0(1400, "Wake up destroy routine\n");
360 /* Wake up destroy routine if he is waiting */
361 pthread_cond_broadcast(&wq->work);
363 Dmsg0(1400, "Unlock mutex\n");
365 Dmsg0(1400, "Return from workq_server\n");
368 Dmsg0(1400, "Check for work request\n");
370 * If no more work requests, and we waited long enough, quit
372 Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
373 Dmsg1(1400, "timedout=%d\n", timedout);
374 if (wq->first == NULL && timedout) {
375 Dmsg0(1400, "break big loop\n");
379 Dmsg0(1400, "Loop again\n");
380 } /* end of big for loop */
382 Dmsg0(1400, "unlock mutex\n");
384 Dmsg0(1400, "End workq_server\n");