]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
Big backport from Enterprise
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula work queue routines. Permits passing work to
21  *  multiple threads.
22  *
23  *  Kern Sibbald, January MMI
24  *
25  *  This code adapted from "Programming with POSIX Threads", by
26  *    David R. Butenhof
27  *
28  * Example:
29  *
30  * static workq_t job_wq;    define work queue
31  *
32  *  Initialize queue
33  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
34  *     berrno be;
35  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
36  *   }
37  *
38  *  Add an item to the queue
39  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
40  *      berrno be;
41  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
42  *   }
43  *
44  *  Wait for all queued work to be completed
45  *  if ((stat = workq_wait_idle(&job_wq, (void *)jcr)) != 0) {
46  *     berrno be;
47  *     Emsg1(M_ABORT, 0, "Could not wait for idle: ERR=%s\n", be.bstrerror(errno));
48  *  }
49  *
50  *  Terminate the queue
51  *  workq_destroy(workq_t *wq);
52  *
53  */
54
55 #include "bacula.h"
56 #include "jcr.h"
57
58 /* Forward referenced functions */
59 extern "C" void *workq_server(void *arg);
60
61 /*
62  * Initialize a work queue
63  *
64  *  Returns: 0 on success
65  *           errno on failure
66  */
67 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
68 {
69    int stat;
70
71    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
72       return stat;
73    }
74    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
75       pthread_attr_destroy(&wq->attr);
76       return stat;
77    }
78    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
79       pthread_attr_destroy(&wq->attr);
80       return stat;
81    }
82    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
83       pthread_mutex_destroy(&wq->mutex);
84       pthread_attr_destroy(&wq->attr);
85       return stat;
86    }
87    if ((stat = pthread_cond_init(&wq->idle, NULL)) != 0) {
88       pthread_mutex_destroy(&wq->mutex);
89       pthread_attr_destroy(&wq->attr);
90       pthread_cond_destroy(&wq->work);
91       return stat;
92    }
93    wq->quit = 0;
94    wq->first = wq->last = NULL;
95    wq->max_workers = threads;         /* max threads to create */
96    wq->num_workers = 0;               /* no threads yet */
97    wq->num_running = 0;               /* no running threads */
98    wq->idle_workers = 0;              /* no idle threads */
99    wq->engine = engine;               /* routine to run */
100    wq->valid = WORKQ_VALID;
101    return 0;
102 }
103
104 /*        [B
105  * Destroy a work queue
106  *
107  * Returns: 0 on success
108  *          errno on failure
109  */
110 int workq_destroy(workq_t *wq)
111 {
112    int stat, stat1, stat2, stat3;
113
114   if (wq->valid != WORKQ_VALID) {
115      return EINVAL;
116   }
117   P(wq->mutex);
118   wq->valid = 0;                      /* prevent any more operations */
119
120   /*
121    * If any threads are active, wake them
122    */
123   if (wq->num_workers > 0) {
124      wq->quit = 1;
125      if (wq->idle_workers) {
126         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
127            V(wq->mutex);
128            return stat;
129         }
130      }
131      while (wq->num_workers > 0) {
132         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
133            V(wq->mutex);
134            return stat;
135         }
136      }
137   }
138   V(wq->mutex);
139   stat  = pthread_mutex_destroy(&wq->mutex);
140   stat1 = pthread_cond_destroy(&wq->work);
141   stat2 = pthread_attr_destroy(&wq->attr);
142   stat3 = pthread_cond_destroy(&wq->idle);
143   if (stat != 0) return stat;
144   if (stat1 != 0) return stat1;
145   if (stat2 != 0) return stat2;
146   if (stat3 != 0) return stat3;
147   return 0;
148 }
149
150 /*
151  * Wait for work to terminate
152  *
153  * Returns: 0 on success
154  *          errno on failure
155  */
156 int workq_wait_idle(workq_t *wq)
157 {
158    int stat;
159
160   if (wq->valid != WORKQ_VALID) {
161      return EINVAL;
162   }
163   P(wq->mutex);
164
165   /* While there is work, wait */
166   while (wq->num_running || wq->first != NULL) {
167      if ((stat = pthread_cond_wait(&wq->idle, &wq->mutex)) != 0) {
168         V(wq->mutex);
169         return stat;
170      }
171   }
172   V(wq->mutex);
173   return 0;
174 }
175
176
177
178 /*
179  *  Add work to a queue
180  *    wq is a queue that was created with workq_init
181  *    element is a user unique item that will be passed to the
182  *        processing routine
183  *    work_item will get internal work queue item -- if it is not NULL
184  *    priority if non-zero will cause the item to be placed on the
185  *        head of the list instead of the tail.
186  */
187 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
188 {
189    int stat=0;
190    workq_ele_t *item;
191    pthread_t id;
192
193    Dmsg0(1400, "workq_add\n");
194    if (wq->valid != WORKQ_VALID) {
195       return EINVAL;
196    }
197
198    if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
199       return ENOMEM;
200    }
201    item->data = element;
202    item->next = NULL;
203    P(wq->mutex);
204
205    Dmsg0(1400, "add item to queue\n");
206    if (priority) {
207       /* Add to head of queue */
208       if (wq->first == NULL) {
209          wq->first = item;
210          wq->last = item;
211       } else {
212          item->next = wq->first;
213          wq->first = item;
214       }
215    } else {
216       /* Add to end of queue */
217       if (wq->first == NULL) {
218          wq->first = item;
219       } else {
220          wq->last->next = item;
221       }
222       wq->last = item;
223    }
224
225    /* if any threads are idle, wake one */
226    if (wq->idle_workers > 0) {
227       Dmsg0(1400, "Signal worker\n");
228       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
229          V(wq->mutex);
230          return stat;
231       }
232    } else if (wq->num_workers < wq->max_workers) {
233       Dmsg0(1400, "Create worker thread\n");
234       /* No idle threads so create a new one */
235       set_thread_concurrency(wq->max_workers + 1);
236       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
237          V(wq->mutex);
238          return stat;
239       }
240       wq->num_workers++;
241    }
242    V(wq->mutex);
243    Dmsg0(1400, "Return workq_add\n");
244    /* Return work_item if requested */
245    if (work_item) {
246       *work_item = item;
247    }
248    return stat;
249 }
250
251 /*
252  *  Remove work from a queue
253  *    wq is a queue that was created with workq_init
254  *    work_item is an element of work
255  *
256  *   Note, it is "removed" by immediately calling a processing routine.
257  *    if you want to cancel it, you need to provide some external means
258  *    of doing so.
259  */
260 int workq_remove(workq_t *wq, workq_ele_t *work_item)
261 {
262    int stat, found = 0;
263    pthread_t id;
264    workq_ele_t *item, *prev;
265
266    Dmsg0(1400, "workq_remove\n");
267    if (wq->valid != WORKQ_VALID) {
268       return EINVAL;
269    }
270
271    P(wq->mutex);
272
273    for (prev=item=wq->first; item; item=item->next) {
274       if (item == work_item) {
275          found = 1;
276          break;
277       }
278       prev = item;
279    }
280    if (!found) {
281       return EINVAL;
282    }
283
284    /* Move item to be first on list */
285    if (wq->first != work_item) {
286       prev->next = work_item->next;
287       if (wq->last == work_item) {
288          wq->last = prev;
289       }
290       work_item->next = wq->first;
291       wq->first = work_item;
292    }
293
294    /* if any threads are idle, wake one */
295    if (wq->idle_workers > 0) {
296       Dmsg0(1400, "Signal worker\n");
297       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
298          V(wq->mutex);
299          return stat;
300       }
301    } else {
302       Dmsg0(1400, "Create worker thread\n");
303       /* No idle threads so create a new one */
304       set_thread_concurrency(wq->max_workers + 1);
305       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
306          V(wq->mutex);
307          return stat;
308       }
309       wq->num_workers++;
310    }
311    V(wq->mutex);
312    Dmsg0(1400, "Return workq_remove\n");
313    return stat;
314 }
315
316
317 /*
318  * This is the worker thread that serves the work queue.
319  * In due course, it will call the user's engine.
320  */
321 extern "C"
322 void *workq_server(void *arg)
323 {
324    struct timespec timeout;
325    workq_t *wq = (workq_t *)arg;
326    workq_ele_t *we;
327    int stat, timedout;
328
329    Dmsg0(1400, "Start workq_server\n");
330    P(wq->mutex);
331    set_jcr_in_tsd(INVALID_JCR);
332
333    for (;;) {
334       struct timeval tv;
335       struct timezone tz;
336
337       Dmsg0(1400, "Top of for loop\n");
338       timedout = 0;
339       Dmsg0(1400, "gettimeofday()\n");
340       gettimeofday(&tv, &tz);
341       timeout.tv_nsec = 0;
342       timeout.tv_sec = tv.tv_sec + 2;
343
344       while (wq->first == NULL && !wq->quit) {
345          /*
346           * Wait 2 seconds, then if no more work, exit
347           */
348          Dmsg0(1400, "pthread_cond_timedwait()\n");
349          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
350          Dmsg1(1400, "timedwait=%d\n", stat);
351          if (stat == ETIMEDOUT) {
352             timedout = 1;
353             break;
354          } else if (stat != 0) {
355             /* This shouldn't happen */
356             Dmsg0(1400, "This shouldn't happen\n");
357             wq->num_workers--;
358             V(wq->mutex);
359             return NULL;
360          }
361       }
362       we = wq->first;
363       if (we != NULL) {
364          wq->first = we->next;
365          if (wq->last == we) {
366             wq->last = NULL;
367          }
368          wq->num_running++;
369          V(wq->mutex);
370          /* Call user's routine here */
371          Dmsg0(1400, "Calling user engine.\n");
372          wq->engine(we->data);
373          Dmsg0(1400, "Back from user engine.\n");
374          free(we);                    /* release work entry */
375          Dmsg0(1400, "relock mutex\n");
376          P(wq->mutex);
377          wq->num_running--;
378          Dmsg0(1400, "Done lock mutex\n");
379       }
380       if (wq->first == NULL && !wq->num_running) {
381           pthread_cond_broadcast(&wq->idle);
382       }
383       /*
384        * If no more work request, and we are asked to quit, then do it
385        */
386       if (wq->first == NULL && wq->quit) {
387          wq->num_workers--;
388          if (wq->num_workers == 0) {
389             Dmsg0(1400, "Wake up destroy routine\n");
390             /* Wake up destroy routine if he is waiting */
391             pthread_cond_broadcast(&wq->work);
392          }
393          Dmsg0(1400, "Unlock mutex\n");
394          V(wq->mutex);
395          Dmsg0(1400, "Return from workq_server\n");
396          return NULL;
397       }
398       Dmsg0(1400, "Check for work request\n");
399       /*
400        * If no more work requests, and we waited long enough, quit
401        */
402       Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
403       Dmsg1(1400, "timedout=%d\n", timedout);
404       if (wq->first == NULL && timedout) {
405          Dmsg0(1400, "break big loop\n");
406          wq->num_workers--;
407          break;
408       }
409       Dmsg0(1400, "Loop again\n");
410    } /* end of big for loop */
411
412    Dmsg0(1400, "unlock mutex\n");
413    V(wq->mutex);
414    Dmsg0(1400, "End workq_server\n");
415    return NULL;
416 }
417
418
419 //=================================================
420 #ifdef TEST_PROGRAM
421
422 #define TEST_SLEEP_TIME_IN_SECONDS 3
423 #define TEST_MAX_NUM_WORKERS 5
424 #define TEST_NUM_WORKS 10
425
426
427 void *callback(void *ctx)
428 {
429    JCR* jcr = (JCR*)ctx;
430
431    if (jcr)
432    {
433       Jmsg1(jcr, M_INFO, 0, _("workq_test: thread %d : now starting work....\n"), (int)pthread_self());
434       sleep(TEST_SLEEP_TIME_IN_SECONDS);
435       Jmsg1(jcr, M_INFO, 0, _("workq_test: thread %d : ...work completed.\n"), (int)pthread_self());
436    }
437    return NULL;
438 }
439
440
441 char *configfile = NULL;
442 //STORES *me = NULL;                    /* our Global resource */
443 bool forge_on = false;                /* proceed inspite of I/O errors */
444 pthread_mutex_t device_release_mutex = PTHREAD_MUTEX_INITIALIZER;
445 pthread_cond_t wait_device_release = PTHREAD_COND_INITIALIZER;
446
447 int main (int argc, char *argv[])
448 {
449    pthread_attr_t attr;
450
451    void * start_heap = sbrk(0);
452    (void)start_heap;
453
454    setlocale(LC_ALL, "");
455    bindtextdomain("bacula", LOCALEDIR);
456    textdomain("bacula");
457    init_stack_dump();
458    my_name_is(argc, argv, "workq_test");
459    init_msg(NULL, NULL);
460    daemon_start_time = time(NULL);
461    set_thread_concurrency(150);
462    lmgr_init_thread(); /* initialize the lockmanager stack */
463    pthread_attr_init(&attr);
464
465    int stat(-1);
466    berrno be;
467
468    workq_t queue;
469    /* Start work queues */
470    if ((stat = workq_init(&queue, TEST_MAX_NUM_WORKERS, callback)) != 0)
471    {
472       be.set_errno(stat);
473       Emsg1(M_ABORT, 0, _("Could not init work queue: ERR=%s\n"), be.bstrerror());
474    }
475
476    /* job1 is created and pseudo-submits some work to the work queue*/
477    JCR *jcr1 = new_jcr(sizeof(JCR), NULL);
478    jcr1->JobId = 1;
479    workq_ele_t * ret(0);
480    for (int w=0; w<TEST_NUM_WORKS; ++w)
481    {
482       if ((stat = workq_add(&queue, jcr1, &ret, 0)) != 0)
483       {
484          be.set_errno(stat);
485          Emsg1(M_ABORT, 0, _("Could not add work to queue: ERR=%s\n"), be.bstrerror());
486       }
487    }
488
489    JCR *jcr2 = new_jcr(sizeof(JCR), NULL);
490    jcr2->JobId = 2;
491    for (int w=0; w<TEST_NUM_WORKS; ++w)
492    {
493       if ((stat = workq_add(&queue, jcr2, &ret, 0)) != 0)
494       {
495          be.set_errno(stat);
496          Emsg1(M_ABORT, 0, _("Could not add work to queue: ERR=%s\n"), be.bstrerror());
497       }
498    }
499
500    printf("--------------------------------------------------------------\n");
501    printf("Start workq_wait_idle ....\n");
502    if ((stat = workq_wait_idle(&queue)) != 0)
503    {
504       be.set_errno(stat);
505       Emsg1(M_ABORT, 0, _("Waiting for workq to be empty: ERR=%s\n"), be.bstrerror());
506    }
507    printf("... workq_wait_idle completed.\n");
508    printf("--------------------------------------------------------------\n");
509
510    printf("Start workq_destroy ....\n");
511    if ((stat = workq_destroy(&queue)) != 0)
512    {
513       be.set_errno(stat);
514       Emsg1(M_ABORT, 0, _("Error in workq_destroy: ERR=%s\n"), be.bstrerror());
515    }
516    printf("... workq_destroy completed.\n");
517
518    return 0;
519
520 }
521
522 #endif