2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2015 Kern Sibbald
5 Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
7 The original author of Bacula is Kern Sibbald, with contributions
8 from many others, a complete list can be found in the file AUTHORS.
10 You may use this file and others of this release according to the
11 license defined in the LICENSE file, which includes the Affero General
12 Public License, v3.0 ("AGPLv3") and some additional permissions and
13 terms pursuant to its AGPLv3 Section 7.
15 This notice must be preserved when any source code is
16 conveyed and/or propagated.
18 Bacula(R) is a registered trademark of Kern Sibbald.
21 * Bacula work queue routines. Permits passing work to
24 * Kern Sibbald, January MMI
26 * This code adapted from "Programming with POSIX Threads", by
31 * static workq_t job_wq; define work queue
34 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
36 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
39 * Add an item to the queue
40 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
42 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
46 * workq_destroy(workq_t *wq);
53 /* Forward referenced functions */
54 extern "C" void *workq_server(void *arg);
57 * Initialize a work queue
59 * Returns: 0 on success
62 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
66 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
69 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
70 pthread_attr_destroy(&wq->attr);
73 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
74 pthread_attr_destroy(&wq->attr);
77 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
78 pthread_mutex_destroy(&wq->mutex);
79 pthread_attr_destroy(&wq->attr);
83 wq->first = wq->last = NULL;
84 wq->max_workers = threads; /* max threads to create */
85 wq->num_workers = 0; /* no threads yet */
86 wq->idle_workers = 0; /* no idle threads */
87 wq->engine = engine; /* routine to run */
88 wq->valid = WORKQ_VALID;
93 * Destroy a work queue
95 * Returns: 0 on success
98 int workq_destroy(workq_t *wq)
100 int stat, stat1, stat2;
102 if (wq->valid != WORKQ_VALID) {
106 wq->valid = 0; /* prevent any more operations */
109 * If any threads are active, wake them
111 if (wq->num_workers > 0) {
113 if (wq->idle_workers) {
114 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
119 while (wq->num_workers > 0) {
120 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
127 stat = pthread_mutex_destroy(&wq->mutex);
128 stat1 = pthread_cond_destroy(&wq->work);
129 stat2 = pthread_attr_destroy(&wq->attr);
130 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
135 * Add work to a queue
136 * wq is a queue that was created with workq_init
137 * element is a user unique item that will be passed to the
139 * work_item will get internal work queue item -- if it is not NULL
140 * priority if non-zero will cause the item to be placed on the
141 * head of the list instead of the tail.
143 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
149 Dmsg0(1400, "workq_add\n");
150 if (wq->valid != WORKQ_VALID) {
154 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
157 item->data = element;
161 Dmsg0(1400, "add item to queue\n");
163 /* Add to head of queue */
164 if (wq->first == NULL) {
168 item->next = wq->first;
172 /* Add to end of queue */
173 if (wq->first == NULL) {
176 wq->last->next = item;
181 /* if any threads are idle, wake one */
182 if (wq->idle_workers > 0) {
183 Dmsg0(1400, "Signal worker\n");
184 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
188 } else if (wq->num_workers < wq->max_workers) {
189 Dmsg0(1400, "Create worker thread\n");
190 /* No idle threads so create a new one */
191 set_thread_concurrency(wq->max_workers + 1);
192 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
199 Dmsg0(1400, "Return workq_add\n");
200 /* Return work_item if requested */
208 * Remove work from a queue
209 * wq is a queue that was created with workq_init
210 * work_item is an element of work
212 * Note, it is "removed" by immediately calling a processing routine.
213 * if you want to cancel it, you need to provide some external means
216 int workq_remove(workq_t *wq, workq_ele_t *work_item)
220 workq_ele_t *item, *prev;
222 Dmsg0(1400, "workq_remove\n");
223 if (wq->valid != WORKQ_VALID) {
229 for (prev=item=wq->first; item; item=item->next) {
230 if (item == work_item) {
240 /* Move item to be first on list */
241 if (wq->first != work_item) {
242 prev->next = work_item->next;
243 if (wq->last == work_item) {
246 work_item->next = wq->first;
247 wq->first = work_item;
250 /* if any threads are idle, wake one */
251 if (wq->idle_workers > 0) {
252 Dmsg0(1400, "Signal worker\n");
253 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
258 Dmsg0(1400, "Create worker thread\n");
259 /* No idle threads so create a new one */
260 set_thread_concurrency(wq->max_workers + 1);
261 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
268 Dmsg0(1400, "Return workq_remove\n");
274 * This is the worker thread that serves the work queue.
275 * In due course, it will call the user's engine.
278 void *workq_server(void *arg)
280 struct timespec timeout;
281 workq_t *wq = (workq_t *)arg;
285 Dmsg0(1400, "Start workq_server\n");
287 set_jcr_in_tsd(INVALID_JCR);
293 Dmsg0(1400, "Top of for loop\n");
295 Dmsg0(1400, "gettimeofday()\n");
296 gettimeofday(&tv, &tz);
298 timeout.tv_sec = tv.tv_sec + 2;
300 while (wq->first == NULL && !wq->quit) {
302 * Wait 2 seconds, then if no more work, exit
304 Dmsg0(1400, "pthread_cond_timedwait()\n");
305 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
306 Dmsg1(1400, "timedwait=%d\n", stat);
307 if (stat == ETIMEDOUT) {
310 } else if (stat != 0) {
311 /* This shouldn't happen */
312 Dmsg0(1400, "This shouldn't happen\n");
320 wq->first = we->next;
321 if (wq->last == we) {
325 /* Call user's routine here */
326 Dmsg0(1400, "Calling user engine.\n");
327 wq->engine(we->data);
328 Dmsg0(1400, "Back from user engine.\n");
329 free(we); /* release work entry */
330 Dmsg0(1400, "relock mutex\n");
332 Dmsg0(1400, "Done lock mutex\n");
335 * If no more work request, and we are asked to quit, then do it
337 if (wq->first == NULL && wq->quit) {
339 if (wq->num_workers == 0) {
340 Dmsg0(1400, "Wake up destroy routine\n");
341 /* Wake up destroy routine if he is waiting */
342 pthread_cond_broadcast(&wq->work);
344 Dmsg0(1400, "Unlock mutex\n");
346 Dmsg0(1400, "Return from workq_server\n");
349 Dmsg0(1400, "Check for work request\n");
351 * If no more work requests, and we waited long enough, quit
353 Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
354 Dmsg1(1400, "timedout=%d\n", timedout);
355 if (wq->first == NULL && timedout) {
356 Dmsg0(1400, "break big loop\n");
360 Dmsg0(1400, "Loop again\n");
361 } /* end of big for loop */
363 Dmsg0(1400, "unlock mutex\n");
365 Dmsg0(1400, "End workq_server\n");