]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
Backport from Bacula Enterprise
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5    Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
6
7    The original author of Bacula is Kern Sibbald, with contributions
8    from many others, a complete list can be found in the file AUTHORS.
9
10    You may use this file and others of this release according to the
11    license defined in the LICENSE file, which includes the Affero General
12    Public License, v3.0 ("AGPLv3") and some additional permissions and
13    terms pursuant to its AGPLv3 Section 7.
14
15    This notice must be preserved when any source code is 
16    conveyed and/or propagated.
17
18    Bacula(R) is a registered trademark of Kern Sibbald.
19 */
20 /*
21  * Bacula work queue routines. Permits passing work to
22  *  multiple threads.
23  *
24  *  Kern Sibbald, January MMI
25  *
26  *  This code adapted from "Programming with POSIX Threads", by
27  *    David R. Butenhof
28  *
29  * Example:
30  *
31  * static workq_t job_wq;    define work queue
32  *
33  *  Initialize queue
34  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
35  *     berrno be;
36  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
37  *   }
38  *
39  *  Add an item to the queue
40  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
41  *      berrno be;
42  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
43  *   }
44  *
45  *  Terminate the queue
46  *  workq_destroy(workq_t *wq);
47  *
48  */
49
50 #include "bacula.h"
51 #include "jcr.h"
52
53 /* Forward referenced functions */
54 extern "C" void *workq_server(void *arg);
55
56 /*
57  * Initialize a work queue
58  *
59  *  Returns: 0 on success
60  *           errno on failure
61  */
62 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
63 {
64    int stat;
65
66    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
67       return stat;
68    }
69    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
70       pthread_attr_destroy(&wq->attr);
71       return stat;
72    }
73    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
74       pthread_attr_destroy(&wq->attr);
75       return stat;
76    }
77    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
78       pthread_mutex_destroy(&wq->mutex);
79       pthread_attr_destroy(&wq->attr);
80       return stat;
81    }
82    wq->quit = 0;
83    wq->first = wq->last = NULL;
84    wq->max_workers = threads;         /* max threads to create */
85    wq->num_workers = 0;               /* no threads yet */
86    wq->idle_workers = 0;              /* no idle threads */
87    wq->engine = engine;               /* routine to run */
88    wq->valid = WORKQ_VALID;
89    return 0;
90 }
91
92 /*
93  * Destroy a work queue
94  *
95  * Returns: 0 on success
96  *          errno on failure
97  */
98 int workq_destroy(workq_t *wq)
99 {
100    int stat, stat1, stat2;
101
102   if (wq->valid != WORKQ_VALID) {
103      return EINVAL;
104   }
105   P(wq->mutex);
106   wq->valid = 0;                      /* prevent any more operations */
107
108   /*
109    * If any threads are active, wake them
110    */
111   if (wq->num_workers > 0) {
112      wq->quit = 1;
113      if (wq->idle_workers) {
114         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
115            V(wq->mutex);
116            return stat;
117         }
118      }
119      while (wq->num_workers > 0) {
120         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
121            V(wq->mutex);
122            return stat;
123         }
124      }
125   }
126   V(wq->mutex);
127   stat  = pthread_mutex_destroy(&wq->mutex);
128   stat1 = pthread_cond_destroy(&wq->work);
129   stat2 = pthread_attr_destroy(&wq->attr);
130   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
131 }
132
133
134 /*
135  *  Add work to a queue
136  *    wq is a queue that was created with workq_init
137  *    element is a user unique item that will be passed to the
138  *        processing routine
139  *    work_item will get internal work queue item -- if it is not NULL
140  *    priority if non-zero will cause the item to be placed on the
141  *        head of the list instead of the tail.
142  */
143 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
144 {
145    int stat=0;
146    workq_ele_t *item;
147    pthread_t id;
148
149    Dmsg0(1400, "workq_add\n");
150    if (wq->valid != WORKQ_VALID) {
151       return EINVAL;
152    }
153
154    if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
155       return ENOMEM;
156    }
157    item->data = element;
158    item->next = NULL;
159    P(wq->mutex);
160
161    Dmsg0(1400, "add item to queue\n");
162    if (priority) {
163       /* Add to head of queue */
164       if (wq->first == NULL) {
165          wq->first = item;
166          wq->last = item;
167       } else {
168          item->next = wq->first;
169          wq->first = item;
170       }
171    } else {
172       /* Add to end of queue */
173       if (wq->first == NULL) {
174          wq->first = item;
175       } else {
176          wq->last->next = item;
177       }
178       wq->last = item;
179    }
180
181    /* if any threads are idle, wake one */
182    if (wq->idle_workers > 0) {
183       Dmsg0(1400, "Signal worker\n");
184       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
185          V(wq->mutex);
186          return stat;
187       }
188    } else if (wq->num_workers < wq->max_workers) {
189       Dmsg0(1400, "Create worker thread\n");
190       /* No idle threads so create a new one */
191       set_thread_concurrency(wq->max_workers + 1);
192       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
193          V(wq->mutex);
194          return stat;
195       }
196       wq->num_workers++;
197    }
198    V(wq->mutex);
199    Dmsg0(1400, "Return workq_add\n");
200    /* Return work_item if requested */
201    if (work_item) {
202       *work_item = item;
203    }
204    return stat;
205 }
206
207 /*
208  *  Remove work from a queue
209  *    wq is a queue that was created with workq_init
210  *    work_item is an element of work
211  *
212  *   Note, it is "removed" by immediately calling a processing routine.
213  *    if you want to cancel it, you need to provide some external means
214  *    of doing so.
215  */
216 int workq_remove(workq_t *wq, workq_ele_t *work_item)
217 {
218    int stat, found = 0;
219    pthread_t id;
220    workq_ele_t *item, *prev;
221
222    Dmsg0(1400, "workq_remove\n");
223    if (wq->valid != WORKQ_VALID) {
224       return EINVAL;
225    }
226
227    P(wq->mutex);
228
229    for (prev=item=wq->first; item; item=item->next) {
230       if (item == work_item) {
231          found = 1;
232          break;
233       }
234       prev = item;
235    }
236    if (!found) {
237       return EINVAL;
238    }
239
240    /* Move item to be first on list */
241    if (wq->first != work_item) {
242       prev->next = work_item->next;
243       if (wq->last == work_item) {
244          wq->last = prev;
245       }
246       work_item->next = wq->first;
247       wq->first = work_item;
248    }
249
250    /* if any threads are idle, wake one */
251    if (wq->idle_workers > 0) {
252       Dmsg0(1400, "Signal worker\n");
253       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
254          V(wq->mutex);
255          return stat;
256       }
257    } else {
258       Dmsg0(1400, "Create worker thread\n");
259       /* No idle threads so create a new one */
260       set_thread_concurrency(wq->max_workers + 1);
261       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
262          V(wq->mutex);
263          return stat;
264       }
265       wq->num_workers++;
266    }
267    V(wq->mutex);
268    Dmsg0(1400, "Return workq_remove\n");
269    return stat;
270 }
271
272
273 /*
274  * This is the worker thread that serves the work queue.
275  * In due course, it will call the user's engine.
276  */
277 extern "C"
278 void *workq_server(void *arg)
279 {
280    struct timespec timeout;
281    workq_t *wq = (workq_t *)arg;
282    workq_ele_t *we;
283    int stat, timedout;
284
285    Dmsg0(1400, "Start workq_server\n");
286    P(wq->mutex);
287    set_jcr_in_tsd(INVALID_JCR);
288
289    for (;;) {
290       struct timeval tv;
291       struct timezone tz;
292
293       Dmsg0(1400, "Top of for loop\n");
294       timedout = 0;
295       Dmsg0(1400, "gettimeofday()\n");
296       gettimeofday(&tv, &tz);
297       timeout.tv_nsec = 0;
298       timeout.tv_sec = tv.tv_sec + 2;
299
300       while (wq->first == NULL && !wq->quit) {
301          /*
302           * Wait 2 seconds, then if no more work, exit
303           */
304          Dmsg0(1400, "pthread_cond_timedwait()\n");
305          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
306          Dmsg1(1400, "timedwait=%d\n", stat);
307          if (stat == ETIMEDOUT) {
308             timedout = 1;
309             break;
310          } else if (stat != 0) {
311             /* This shouldn't happen */
312             Dmsg0(1400, "This shouldn't happen\n");
313             wq->num_workers--;
314             V(wq->mutex);
315             return NULL;
316          }
317       }
318       we = wq->first;
319       if (we != NULL) {
320          wq->first = we->next;
321          if (wq->last == we) {
322             wq->last = NULL;
323          }
324          V(wq->mutex);
325          /* Call user's routine here */
326          Dmsg0(1400, "Calling user engine.\n");
327          wq->engine(we->data);
328          Dmsg0(1400, "Back from user engine.\n");
329          free(we);                    /* release work entry */
330          Dmsg0(1400, "relock mutex\n");
331          P(wq->mutex);
332          Dmsg0(1400, "Done lock mutex\n");
333       }
334       /*
335        * If no more work request, and we are asked to quit, then do it
336        */
337       if (wq->first == NULL && wq->quit) {
338          wq->num_workers--;
339          if (wq->num_workers == 0) {
340             Dmsg0(1400, "Wake up destroy routine\n");
341             /* Wake up destroy routine if he is waiting */
342             pthread_cond_broadcast(&wq->work);
343          }
344          Dmsg0(1400, "Unlock mutex\n");
345          V(wq->mutex);
346          Dmsg0(1400, "Return from workq_server\n");
347          return NULL;
348       }
349       Dmsg0(1400, "Check for work request\n");
350       /*
351        * If no more work requests, and we waited long enough, quit
352        */
353       Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
354       Dmsg1(1400, "timedout=%d\n", timedout);
355       if (wq->first == NULL && timedout) {
356          Dmsg0(1400, "break big loop\n");
357          wq->num_workers--;
358          break;
359       }
360       Dmsg0(1400, "Loop again\n");
361    } /* end of big for loop */
362
363    Dmsg0(1400, "unlock mutex\n");
364    V(wq->mutex);
365    Dmsg0(1400, "End workq_server\n");
366    return NULL;
367 }