]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
Backport from BEE
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2014 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from many
7    others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    Bacula® is a registered trademark of Kern Sibbald.
15 */
16 /*
17  * Bacula work queue routines. Permits passing work to
18  *  multiple threads.
19  *
20  *  Kern Sibbald, January MMI
21  *
22  *  This code adapted from "Programming with POSIX Threads", by
23  *    David R. Butenhof
24  *
25  * Example:
26  *
27  * static workq_t job_wq;    define work queue
28  *
29  *  Initialize queue
30  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
31  *     berrno be;
32  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
33  *   }
34  *
35  *  Add an item to the queue
36  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
37  *      berrno be;
38  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
39  *   }
40  *
41  *  Terminate the queue
42  *  workq_destroy(workq_t *wq);
43  *
44  */
45
46 #include "bacula.h"
47 #include "jcr.h"
48
49 /* Forward referenced functions */
50 extern "C" void *workq_server(void *arg);
51
52 /*
53  * Initialize a work queue
54  *
55  *  Returns: 0 on success
56  *           errno on failure
57  */
58 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
59 {
60    int stat;
61
62    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
63       return stat;
64    }
65    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
66       pthread_attr_destroy(&wq->attr);
67       return stat;
68    }
69    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
70       pthread_attr_destroy(&wq->attr);
71       return stat;
72    }
73    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
74       pthread_mutex_destroy(&wq->mutex);
75       pthread_attr_destroy(&wq->attr);
76       return stat;
77    }
78    wq->quit = 0;
79    wq->first = wq->last = NULL;
80    wq->max_workers = threads;         /* max threads to create */
81    wq->num_workers = 0;               /* no threads yet */
82    wq->idle_workers = 0;              /* no idle threads */
83    wq->engine = engine;               /* routine to run */
84    wq->valid = WORKQ_VALID;
85    return 0;
86 }
87
88 /*
89  * Destroy a work queue
90  *
91  * Returns: 0 on success
92  *          errno on failure
93  */
94 int workq_destroy(workq_t *wq)
95 {
96    int stat, stat1, stat2;
97
98   if (wq->valid != WORKQ_VALID) {
99      return EINVAL;
100   }
101   P(wq->mutex);
102   wq->valid = 0;                      /* prevent any more operations */
103
104   /*
105    * If any threads are active, wake them
106    */
107   if (wq->num_workers > 0) {
108      wq->quit = 1;
109      if (wq->idle_workers) {
110         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
111            V(wq->mutex);
112            return stat;
113         }
114      }
115      while (wq->num_workers > 0) {
116         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
117            V(wq->mutex);
118            return stat;
119         }
120      }
121   }
122   V(wq->mutex);
123   stat  = pthread_mutex_destroy(&wq->mutex);
124   stat1 = pthread_cond_destroy(&wq->work);
125   stat2 = pthread_attr_destroy(&wq->attr);
126   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
127 }
128
129
130 /*
131  *  Add work to a queue
132  *    wq is a queue that was created with workq_init
133  *    element is a user unique item that will be passed to the
134  *        processing routine
135  *    work_item will get internal work queue item -- if it is not NULL
136  *    priority if non-zero will cause the item to be placed on the
137  *        head of the list instead of the tail.
138  */
139 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
140 {
141    int stat=0;
142    workq_ele_t *item;
143    pthread_t id;
144
145    Dmsg0(1400, "workq_add\n");
146    if (wq->valid != WORKQ_VALID) {
147       return EINVAL;
148    }
149
150    if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
151       return ENOMEM;
152    }
153    item->data = element;
154    item->next = NULL;
155    P(wq->mutex);
156
157    Dmsg0(1400, "add item to queue\n");
158    if (priority) {
159       /* Add to head of queue */
160       if (wq->first == NULL) {
161          wq->first = item;
162          wq->last = item;
163       } else {
164          item->next = wq->first;
165          wq->first = item;
166       }
167    } else {
168       /* Add to end of queue */
169       if (wq->first == NULL) {
170          wq->first = item;
171       } else {
172          wq->last->next = item;
173       }
174       wq->last = item;
175    }
176
177    /* if any threads are idle, wake one */
178    if (wq->idle_workers > 0) {
179       Dmsg0(1400, "Signal worker\n");
180       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
181          V(wq->mutex);
182          return stat;
183       }
184    } else if (wq->num_workers < wq->max_workers) {
185       Dmsg0(1400, "Create worker thread\n");
186       /* No idle threads so create a new one */
187       set_thread_concurrency(wq->max_workers + 1);
188       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
189          V(wq->mutex);
190          return stat;
191       }
192       wq->num_workers++;
193    }
194    V(wq->mutex);
195    Dmsg0(1400, "Return workq_add\n");
196    /* Return work_item if requested */
197    if (work_item) {
198       *work_item = item;
199    }
200    return stat;
201 }
202
203 /*
204  *  Remove work from a queue
205  *    wq is a queue that was created with workq_init
206  *    work_item is an element of work
207  *
208  *   Note, it is "removed" by immediately calling a processing routine.
209  *    if you want to cancel it, you need to provide some external means
210  *    of doing so.
211  */
212 int workq_remove(workq_t *wq, workq_ele_t *work_item)
213 {
214    int stat, found = 0;
215    pthread_t id;
216    workq_ele_t *item, *prev;
217
218    Dmsg0(1400, "workq_remove\n");
219    if (wq->valid != WORKQ_VALID) {
220       return EINVAL;
221    }
222
223    P(wq->mutex);
224
225    for (prev=item=wq->first; item; item=item->next) {
226       if (item == work_item) {
227          found = 1;
228          break;
229       }
230       prev = item;
231    }
232    if (!found) {
233       return EINVAL;
234    }
235
236    /* Move item to be first on list */
237    if (wq->first != work_item) {
238       prev->next = work_item->next;
239       if (wq->last == work_item) {
240          wq->last = prev;
241       }
242       work_item->next = wq->first;
243       wq->first = work_item;
244    }
245
246    /* if any threads are idle, wake one */
247    if (wq->idle_workers > 0) {
248       Dmsg0(1400, "Signal worker\n");
249       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
250          V(wq->mutex);
251          return stat;
252       }
253    } else {
254       Dmsg0(1400, "Create worker thread\n");
255       /* No idle threads so create a new one */
256       set_thread_concurrency(wq->max_workers + 1);
257       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
258          V(wq->mutex);
259          return stat;
260       }
261       wq->num_workers++;
262    }
263    V(wq->mutex);
264    Dmsg0(1400, "Return workq_remove\n");
265    return stat;
266 }
267
268
269 /*
270  * This is the worker thread that serves the work queue.
271  * In due course, it will call the user's engine.
272  */
273 extern "C"
274 void *workq_server(void *arg)
275 {
276    struct timespec timeout;
277    workq_t *wq = (workq_t *)arg;
278    workq_ele_t *we;
279    int stat, timedout;
280
281    Dmsg0(1400, "Start workq_server\n");
282    P(wq->mutex);
283    set_jcr_in_tsd(INVALID_JCR);
284
285    for (;;) {
286       struct timeval tv;
287       struct timezone tz;
288
289       Dmsg0(1400, "Top of for loop\n");
290       timedout = 0;
291       Dmsg0(1400, "gettimeofday()\n");
292       gettimeofday(&tv, &tz);
293       timeout.tv_nsec = 0;
294       timeout.tv_sec = tv.tv_sec + 2;
295
296       while (wq->first == NULL && !wq->quit) {
297          /*
298           * Wait 2 seconds, then if no more work, exit
299           */
300          Dmsg0(1400, "pthread_cond_timedwait()\n");
301          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
302          Dmsg1(1400, "timedwait=%d\n", stat);
303          if (stat == ETIMEDOUT) {
304             timedout = 1;
305             break;
306          } else if (stat != 0) {
307             /* This shouldn't happen */
308             Dmsg0(1400, "This shouldn't happen\n");
309             wq->num_workers--;
310             V(wq->mutex);
311             return NULL;
312          }
313       }
314       we = wq->first;
315       if (we != NULL) {
316          wq->first = we->next;
317          if (wq->last == we) {
318             wq->last = NULL;
319          }
320          V(wq->mutex);
321          /* Call user's routine here */
322          Dmsg0(1400, "Calling user engine.\n");
323          wq->engine(we->data);
324          Dmsg0(1400, "Back from user engine.\n");
325          free(we);                    /* release work entry */
326          Dmsg0(1400, "relock mutex\n");
327          P(wq->mutex);
328          Dmsg0(1400, "Done lock mutex\n");
329       }
330       /*
331        * If no more work request, and we are asked to quit, then do it
332        */
333       if (wq->first == NULL && wq->quit) {
334          wq->num_workers--;
335          if (wq->num_workers == 0) {
336             Dmsg0(1400, "Wake up destroy routine\n");
337             /* Wake up destroy routine if he is waiting */
338             pthread_cond_broadcast(&wq->work);
339          }
340          Dmsg0(1400, "Unlock mutex\n");
341          V(wq->mutex);
342          Dmsg0(1400, "Return from workq_server\n");
343          return NULL;
344       }
345       Dmsg0(1400, "Check for work request\n");
346       /*
347        * If no more work requests, and we waited long enough, quit
348        */
349       Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
350       Dmsg1(1400, "timedout=%d\n", timedout);
351       if (wq->first == NULL && timedout) {
352          Dmsg0(1400, "break big loop\n");
353          wq->num_workers--;
354          break;
355       }
356       Dmsg0(1400, "Loop again\n");
357    } /* end of big for loop */
358
359    Dmsg0(1400, "unlock mutex\n");
360    V(wq->mutex);
361    Dmsg0(1400, "End workq_server\n");
362    return NULL;
363 }