2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2017 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Bacula work queue routines. Permits passing work to
23 * Kern Sibbald, January MMI
25 * This code adapted from "Programming with POSIX Threads", by
30 * static workq_t job_wq; define work queue
33 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
35 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
38 * Add an item to the queue
39 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
41 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
44 * Wait for all queued work to be completed
45 * if ((stat = workq_wait_idle(&job_wq, (void *)jcr)) != 0) {
47 * Emsg1(M_ABORT, 0, "Could not wait for idle: ERR=%s\n", be.bstrerror(errno));
51 * workq_destroy(workq_t *wq);
58 /* Forward referenced functions */
59 extern "C" void *workq_server(void *arg);
62 * Initialize a work queue
64 * Returns: 0 on success
67 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
71 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
74 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
75 pthread_attr_destroy(&wq->attr);
78 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
79 pthread_attr_destroy(&wq->attr);
82 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
83 pthread_mutex_destroy(&wq->mutex);
84 pthread_attr_destroy(&wq->attr);
87 if ((stat = pthread_cond_init(&wq->idle, NULL)) != 0) {
88 pthread_mutex_destroy(&wq->mutex);
89 pthread_attr_destroy(&wq->attr);
90 pthread_cond_destroy(&wq->work);
94 wq->first = wq->last = NULL;
95 wq->max_workers = threads; /* max threads to create */
96 wq->num_workers = 0; /* no threads yet */
97 wq->num_running = 0; /* no running threads */
98 wq->idle_workers = 0; /* no idle threads */
99 wq->engine = engine; /* routine to run */
100 wq->valid = WORKQ_VALID;
105 * Destroy a work queue
107 * Returns: 0 on success
110 int workq_destroy(workq_t *wq)
112 int stat, stat1, stat2, stat3;
114 if (wq->valid != WORKQ_VALID) {
118 wq->valid = 0; /* prevent any more operations */
121 * If any threads are active, wake them
123 if (wq->num_workers > 0) {
125 if (wq->idle_workers) {
126 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
131 while (wq->num_workers > 0) {
132 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
139 stat = pthread_mutex_destroy(&wq->mutex);
140 stat1 = pthread_cond_destroy(&wq->work);
141 stat2 = pthread_attr_destroy(&wq->attr);
142 stat3 = pthread_cond_destroy(&wq->idle);
143 if (stat != 0) return stat;
144 if (stat1 != 0) return stat1;
145 if (stat2 != 0) return stat2;
146 if (stat3 != 0) return stat3;
151 * Wait for work to terminate
153 * Returns: 0 on success
156 int workq_wait_idle(workq_t *wq)
160 if (wq->valid != WORKQ_VALID) {
165 /* While there is work, wait */
166 while (wq->num_running || wq->first != NULL) {
167 if ((stat = pthread_cond_wait(&wq->idle, &wq->mutex)) != 0) {
179 * Add work to a queue
180 * wq is a queue that was created with workq_init
181 * element is a user unique item that will be passed to the
183 * work_item will get internal work queue item -- if it is not NULL
184 * priority if non-zero will cause the item to be placed on the
185 * head of the list instead of the tail.
187 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
193 Dmsg0(1400, "workq_add\n");
194 if (wq->valid != WORKQ_VALID) {
198 if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
201 item->data = element;
205 Dmsg0(1400, "add item to queue\n");
207 /* Add to head of queue */
208 if (wq->first == NULL) {
212 item->next = wq->first;
216 /* Add to end of queue */
217 if (wq->first == NULL) {
220 wq->last->next = item;
225 /* if any threads are idle, wake one */
226 if (wq->idle_workers > 0) {
227 Dmsg0(1400, "Signal worker\n");
228 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
232 } else if (wq->num_workers < wq->max_workers) {
233 Dmsg0(1400, "Create worker thread\n");
234 /* No idle threads so create a new one */
235 set_thread_concurrency(wq->max_workers + 1);
236 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
243 Dmsg0(1400, "Return workq_add\n");
244 /* Return work_item if requested */
252 * Remove work from a queue
253 * wq is a queue that was created with workq_init
254 * work_item is an element of work
256 * Note, it is "removed" by immediately calling a processing routine.
257 * if you want to cancel it, you need to provide some external means
260 int workq_remove(workq_t *wq, workq_ele_t *work_item)
264 workq_ele_t *item, *prev;
266 Dmsg0(1400, "workq_remove\n");
267 if (wq->valid != WORKQ_VALID) {
273 for (prev=item=wq->first; item; item=item->next) {
274 if (item == work_item) {
284 /* Move item to be first on list */
285 if (wq->first != work_item) {
286 prev->next = work_item->next;
287 if (wq->last == work_item) {
290 work_item->next = wq->first;
291 wq->first = work_item;
294 /* if any threads are idle, wake one */
295 if (wq->idle_workers > 0) {
296 Dmsg0(1400, "Signal worker\n");
297 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
302 Dmsg0(1400, "Create worker thread\n");
303 /* No idle threads so create a new one */
304 set_thread_concurrency(wq->max_workers + 1);
305 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
312 Dmsg0(1400, "Return workq_remove\n");
318 * This is the worker thread that serves the work queue.
319 * In due course, it will call the user's engine.
322 void *workq_server(void *arg)
324 struct timespec timeout;
325 workq_t *wq = (workq_t *)arg;
329 Dmsg0(1400, "Start workq_server\n");
331 set_jcr_in_tsd(INVALID_JCR);
337 Dmsg0(1400, "Top of for loop\n");
339 Dmsg0(1400, "gettimeofday()\n");
340 gettimeofday(&tv, &tz);
342 timeout.tv_sec = tv.tv_sec + 2;
344 while (wq->first == NULL && !wq->quit) {
346 * Wait 2 seconds, then if no more work, exit
348 Dmsg0(1400, "pthread_cond_timedwait()\n");
349 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
350 Dmsg1(1400, "timedwait=%d\n", stat);
351 if (stat == ETIMEDOUT) {
354 } else if (stat != 0) {
355 /* This shouldn't happen */
356 Dmsg0(1400, "This shouldn't happen\n");
364 wq->first = we->next;
365 if (wq->last == we) {
370 /* Call user's routine here */
371 Dmsg0(1400, "Calling user engine.\n");
372 wq->engine(we->data);
373 Dmsg0(1400, "Back from user engine.\n");
374 free(we); /* release work entry */
375 Dmsg0(1400, "relock mutex\n");
378 Dmsg0(1400, "Done lock mutex\n");
380 if (wq->first == NULL && !wq->num_running) {
381 pthread_cond_broadcast(&wq->idle);
384 * If no more work request, and we are asked to quit, then do it
386 if (wq->first == NULL && wq->quit) {
388 if (wq->num_workers == 0) {
389 Dmsg0(1400, "Wake up destroy routine\n");
390 /* Wake up destroy routine if he is waiting */
391 pthread_cond_broadcast(&wq->work);
393 Dmsg0(1400, "Unlock mutex\n");
395 Dmsg0(1400, "Return from workq_server\n");
398 Dmsg0(1400, "Check for work request\n");
400 * If no more work requests, and we waited long enough, quit
402 Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
403 Dmsg1(1400, "timedout=%d\n", timedout);
404 if (wq->first == NULL && timedout) {
405 Dmsg0(1400, "break big loop\n");
409 Dmsg0(1400, "Loop again\n");
410 } /* end of big for loop */
412 Dmsg0(1400, "unlock mutex\n");
414 Dmsg0(1400, "End workq_server\n");
419 //=================================================
422 #define TEST_SLEEP_TIME_IN_SECONDS 3
423 #define TEST_MAX_NUM_WORKERS 5
424 #define TEST_NUM_WORKS 10
427 void *callback(void *ctx)
429 JCR* jcr = (JCR*)ctx;
433 Jmsg1(jcr, M_INFO, 0, _("workq_test: thread %d : now starting work....\n"), (int)pthread_self());
434 sleep(TEST_SLEEP_TIME_IN_SECONDS);
435 Jmsg1(jcr, M_INFO, 0, _("workq_test: thread %d : ...work completed.\n"), (int)pthread_self());
441 char *configfile = NULL;
442 //STORES *me = NULL; /* our Global resource */
443 bool forge_on = false; /* proceed inspite of I/O errors */
444 pthread_mutex_t device_release_mutex = PTHREAD_MUTEX_INITIALIZER;
445 pthread_cond_t wait_device_release = PTHREAD_COND_INITIALIZER;
447 int main (int argc, char *argv[])
451 void * start_heap = sbrk(0);
454 setlocale(LC_ALL, "");
455 bindtextdomain("bacula", LOCALEDIR);
456 textdomain("bacula");
458 my_name_is(argc, argv, "workq_test");
459 init_msg(NULL, NULL);
460 daemon_start_time = time(NULL);
461 set_thread_concurrency(150);
462 lmgr_init_thread(); /* initialize the lockmanager stack */
463 pthread_attr_init(&attr);
469 /* Start work queues */
470 if ((stat = workq_init(&queue, TEST_MAX_NUM_WORKERS, callback)) != 0)
473 Emsg1(M_ABORT, 0, _("Could not init work queue: ERR=%s\n"), be.bstrerror());
476 /* job1 is created and pseudo-submits some work to the work queue*/
477 JCR *jcr1 = new_jcr(sizeof(JCR), NULL);
479 workq_ele_t * ret(0);
480 for (int w=0; w<TEST_NUM_WORKS; ++w)
482 if ((stat = workq_add(&queue, jcr1, &ret, 0)) != 0)
485 Emsg1(M_ABORT, 0, _("Could not add work to queue: ERR=%s\n"), be.bstrerror());
489 JCR *jcr2 = new_jcr(sizeof(JCR), NULL);
491 for (int w=0; w<TEST_NUM_WORKS; ++w)
493 if ((stat = workq_add(&queue, jcr2, &ret, 0)) != 0)
496 Emsg1(M_ABORT, 0, _("Could not add work to queue: ERR=%s\n"), be.bstrerror());
500 printf("--------------------------------------------------------------\n");
501 printf("Start workq_wait_idle ....\n");
502 if ((stat = workq_wait_idle(&queue)) != 0)
505 Emsg1(M_ABORT, 0, _("Waiting for workq to be empty: ERR=%s\n"), be.bstrerror());
507 printf("... workq_wait_idle completed.\n");
508 printf("--------------------------------------------------------------\n");
510 printf("Start workq_destroy ....\n");
511 if ((stat = workq_destroy(&queue)) != 0)
514 Emsg1(M_ABORT, 0, _("Error in workq_destroy: ERR=%s\n"), be.bstrerror());
516 printf("... workq_destroy completed.\n");