2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2016 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Bacula work queue routines. Permits passing work to
23 * Kern Sibbald, January MMI
25 * This code adapted from "Programming with POSIX Threads", by
30 * static workq_t job_wq; define work queue
33 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
35 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
38 * Add an item to the queue
39 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
41 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
45 * workq_destroy(workq_t *wq);
52 /* Forward referenced functions */
53 extern "C" void *workq_server(void *arg);
56 * Initialize a work queue
58 * Returns: 0 on success
61 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
65 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
68 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69 pthread_attr_destroy(&wq->attr);
72 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
73 pthread_attr_destroy(&wq->attr);
76 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
77 pthread_mutex_destroy(&wq->mutex);
78 pthread_attr_destroy(&wq->attr);
82 wq->first = wq->last = NULL;
83 wq->max_workers = threads; /* max threads to create */
84 wq->num_workers = 0; /* no threads yet */
85 wq->idle_workers = 0; /* no idle threads */
86 wq->engine = engine; /* routine to run */
87 wq->valid = WORKQ_VALID;
92 * Destroy a work queue
94 * Returns: 0 on success
97 int workq_destroy(workq_t *wq)
99 int stat, stat1, stat2;
101 if (wq->valid != WORKQ_VALID) {
105 wq->valid = 0; /* prevent any more operations */
108 * If any threads are active, wake them
110 if (wq->num_workers > 0) {
112 if (wq->idle_workers) {
113 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
118 while (wq->num_workers > 0) {
119 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
126 stat = pthread_mutex_destroy(&wq->mutex);
127 stat1 = pthread_cond_destroy(&wq->work);
128 stat2 = pthread_attr_destroy(&wq->attr);
129 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
134 * Add work to a queue
135 * wq is a queue that was created with workq_init
136 * element is a user unique item that will be passed to the
138 * work_item will get internal work queue item -- if it is not NULL
139 * priority if non-zero will cause the item to be placed on the
140 * head of the list instead of the tail.
142 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
148 Dmsg0(1400, "workq_add\n");
149 if (wq->valid != WORKQ_VALID) {
153 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
156 item->data = element;
160 Dmsg0(1400, "add item to queue\n");
162 /* Add to head of queue */
163 if (wq->first == NULL) {
167 item->next = wq->first;
171 /* Add to end of queue */
172 if (wq->first == NULL) {
175 wq->last->next = item;
180 /* if any threads are idle, wake one */
181 if (wq->idle_workers > 0) {
182 Dmsg0(1400, "Signal worker\n");
183 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
187 } else if (wq->num_workers < wq->max_workers) {
188 Dmsg0(1400, "Create worker thread\n");
189 /* No idle threads so create a new one */
190 set_thread_concurrency(wq->max_workers + 1);
191 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
198 Dmsg0(1400, "Return workq_add\n");
199 /* Return work_item if requested */
207 * Remove work from a queue
208 * wq is a queue that was created with workq_init
209 * work_item is an element of work
211 * Note, it is "removed" by immediately calling a processing routine.
212 * if you want to cancel it, you need to provide some external means
215 int workq_remove(workq_t *wq, workq_ele_t *work_item)
219 workq_ele_t *item, *prev;
221 Dmsg0(1400, "workq_remove\n");
222 if (wq->valid != WORKQ_VALID) {
228 for (prev=item=wq->first; item; item=item->next) {
229 if (item == work_item) {
239 /* Move item to be first on list */
240 if (wq->first != work_item) {
241 prev->next = work_item->next;
242 if (wq->last == work_item) {
245 work_item->next = wq->first;
246 wq->first = work_item;
249 /* if any threads are idle, wake one */
250 if (wq->idle_workers > 0) {
251 Dmsg0(1400, "Signal worker\n");
252 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
257 Dmsg0(1400, "Create worker thread\n");
258 /* No idle threads so create a new one */
259 set_thread_concurrency(wq->max_workers + 1);
260 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
267 Dmsg0(1400, "Return workq_remove\n");
273 * This is the worker thread that serves the work queue.
274 * In due course, it will call the user's engine.
277 void *workq_server(void *arg)
279 struct timespec timeout;
280 workq_t *wq = (workq_t *)arg;
284 Dmsg0(1400, "Start workq_server\n");
286 set_jcr_in_tsd(INVALID_JCR);
292 Dmsg0(1400, "Top of for loop\n");
294 Dmsg0(1400, "gettimeofday()\n");
295 gettimeofday(&tv, &tz);
297 timeout.tv_sec = tv.tv_sec + 2;
299 while (wq->first == NULL && !wq->quit) {
301 * Wait 2 seconds, then if no more work, exit
303 Dmsg0(1400, "pthread_cond_timedwait()\n");
304 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
305 Dmsg1(1400, "timedwait=%d\n", stat);
306 if (stat == ETIMEDOUT) {
309 } else if (stat != 0) {
310 /* This shouldn't happen */
311 Dmsg0(1400, "This shouldn't happen\n");
319 wq->first = we->next;
320 if (wq->last == we) {
324 /* Call user's routine here */
325 Dmsg0(1400, "Calling user engine.\n");
326 wq->engine(we->data);
327 Dmsg0(1400, "Back from user engine.\n");
328 free(we); /* release work entry */
329 Dmsg0(1400, "relock mutex\n");
331 Dmsg0(1400, "Done lock mutex\n");
334 * If no more work request, and we are asked to quit, then do it
336 if (wq->first == NULL && wq->quit) {
338 if (wq->num_workers == 0) {
339 Dmsg0(1400, "Wake up destroy routine\n");
340 /* Wake up destroy routine if he is waiting */
341 pthread_cond_broadcast(&wq->work);
343 Dmsg0(1400, "Unlock mutex\n");
345 Dmsg0(1400, "Return from workq_server\n");
348 Dmsg0(1400, "Check for work request\n");
350 * If no more work requests, and we waited long enough, quit
352 Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
353 Dmsg1(1400, "timedout=%d\n", timedout);
354 if (wq->first == NULL && timedout) {
355 Dmsg0(1400, "break big loop\n");
359 Dmsg0(1400, "Loop again\n");
360 } /* end of big for loop */
362 Dmsg0(1400, "unlock mutex\n");
364 Dmsg0(1400, "End workq_server\n");