2 Bacula® - The Network Backup Solution
4 Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from many
7 others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 Bacula® is a registered trademark of Kern Sibbald.
17 * Bacula work queue routines. Permits passing work to
20 * Kern Sibbald, January MMI
22 * This code adapted from "Programming with POSIX Threads", by
27 * static workq_t job_wq; define work queue
30 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
32 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
35 * Add an item to the queue
36 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
38 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
42 * workq_destroy(workq_t *wq);
49 /* Forward referenced functions */
50 extern "C" void *workq_server(void *arg);
53 * Initialize a work queue
55 * Returns: 0 on success
58 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
62 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
65 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66 pthread_attr_destroy(&wq->attr);
69 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
70 pthread_attr_destroy(&wq->attr);
73 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
74 pthread_mutex_destroy(&wq->mutex);
75 pthread_attr_destroy(&wq->attr);
79 wq->first = wq->last = NULL;
80 wq->max_workers = threads; /* max threads to create */
81 wq->num_workers = 0; /* no threads yet */
82 wq->idle_workers = 0; /* no idle threads */
83 wq->engine = engine; /* routine to run */
84 wq->valid = WORKQ_VALID;
89 * Destroy a work queue
91 * Returns: 0 on success
94 int workq_destroy(workq_t *wq)
96 int stat, stat1, stat2;
98 if (wq->valid != WORKQ_VALID) {
102 wq->valid = 0; /* prevent any more operations */
105 * If any threads are active, wake them
107 if (wq->num_workers > 0) {
109 if (wq->idle_workers) {
110 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
115 while (wq->num_workers > 0) {
116 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
123 stat = pthread_mutex_destroy(&wq->mutex);
124 stat1 = pthread_cond_destroy(&wq->work);
125 stat2 = pthread_attr_destroy(&wq->attr);
126 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
131 * Add work to a queue
132 * wq is a queue that was created with workq_init
133 * element is a user unique item that will be passed to the
135 * work_item will get internal work queue item -- if it is not NULL
136 * priority if non-zero will cause the item to be placed on the
137 * head of the list instead of the tail.
139 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
145 Dmsg0(1400, "workq_add\n");
146 if (wq->valid != WORKQ_VALID) {
150 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
153 item->data = element;
157 Dmsg0(1400, "add item to queue\n");
159 /* Add to head of queue */
160 if (wq->first == NULL) {
164 item->next = wq->first;
168 /* Add to end of queue */
169 if (wq->first == NULL) {
172 wq->last->next = item;
177 /* if any threads are idle, wake one */
178 if (wq->idle_workers > 0) {
179 Dmsg0(1400, "Signal worker\n");
180 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
184 } else if (wq->num_workers < wq->max_workers) {
185 Dmsg0(1400, "Create worker thread\n");
186 /* No idle threads so create a new one */
187 set_thread_concurrency(wq->max_workers + 1);
188 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
195 Dmsg0(1400, "Return workq_add\n");
196 /* Return work_item if requested */
204 * Remove work from a queue
205 * wq is a queue that was created with workq_init
206 * work_item is an element of work
208 * Note, it is "removed" by immediately calling a processing routine.
209 * if you want to cancel it, you need to provide some external means
212 int workq_remove(workq_t *wq, workq_ele_t *work_item)
216 workq_ele_t *item, *prev;
218 Dmsg0(1400, "workq_remove\n");
219 if (wq->valid != WORKQ_VALID) {
225 for (prev=item=wq->first; item; item=item->next) {
226 if (item == work_item) {
236 /* Move item to be first on list */
237 if (wq->first != work_item) {
238 prev->next = work_item->next;
239 if (wq->last == work_item) {
242 work_item->next = wq->first;
243 wq->first = work_item;
246 /* if any threads are idle, wake one */
247 if (wq->idle_workers > 0) {
248 Dmsg0(1400, "Signal worker\n");
249 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
254 Dmsg0(1400, "Create worker thread\n");
255 /* No idle threads so create a new one */
256 set_thread_concurrency(wq->max_workers + 1);
257 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
264 Dmsg0(1400, "Return workq_remove\n");
270 * This is the worker thread that serves the work queue.
271 * In due course, it will call the user's engine.
274 void *workq_server(void *arg)
276 struct timespec timeout;
277 workq_t *wq = (workq_t *)arg;
281 Dmsg0(1400, "Start workq_server\n");
283 set_jcr_in_tsd(INVALID_JCR);
289 Dmsg0(1400, "Top of for loop\n");
291 Dmsg0(1400, "gettimeofday()\n");
292 gettimeofday(&tv, &tz);
294 timeout.tv_sec = tv.tv_sec + 2;
296 while (wq->first == NULL && !wq->quit) {
298 * Wait 2 seconds, then if no more work, exit
300 Dmsg0(1400, "pthread_cond_timedwait()\n");
301 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
302 Dmsg1(1400, "timedwait=%d\n", stat);
303 if (stat == ETIMEDOUT) {
306 } else if (stat != 0) {
307 /* This shouldn't happen */
308 Dmsg0(1400, "This shouldn't happen\n");
316 wq->first = we->next;
317 if (wq->last == we) {
321 /* Call user's routine here */
322 Dmsg0(1400, "Calling user engine.\n");
323 wq->engine(we->data);
324 Dmsg0(1400, "Back from user engine.\n");
325 free(we); /* release work entry */
326 Dmsg0(1400, "relock mutex\n");
328 Dmsg0(1400, "Done lock mutex\n");
331 * If no more work request, and we are asked to quit, then do it
333 if (wq->first == NULL && wq->quit) {
335 if (wq->num_workers == 0) {
336 Dmsg0(1400, "Wake up destroy routine\n");
337 /* Wake up destroy routine if he is waiting */
338 pthread_cond_broadcast(&wq->work);
340 Dmsg0(1400, "Unlock mutex\n");
342 Dmsg0(1400, "Return from workq_server\n");
345 Dmsg0(1400, "Check for work request\n");
347 * If no more work requests, and we waited long enough, quit
349 Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
350 Dmsg1(1400, "timedout=%d\n", timedout);
351 if (wq->first == NULL && timedout) {
352 Dmsg0(1400, "break big loop\n");
356 Dmsg0(1400, "Loop again\n");
357 } /* end of big for loop */
359 Dmsg0(1400, "unlock mutex\n");
361 Dmsg0(1400, "End workq_server\n");