]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
Add missing files for xattr and eliminate a few compiler complaints
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula work queue routines. Permits passing work to
30  *  multiple threads.
31  *
32  *  Kern Sibbald, January MMI
33  *
34  *   Version $Id$
35  *
36  *  This code adapted from "Programming with POSIX Threads", by
37  *    David R. Butenhof
38  *
39  * Example:
40  *
41  * static workq_t job_wq;    define work queue
42  *
43  *  Initialize queue
44  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
45  *     berrno be;
46  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
47  *   }
48  *
49  *  Add an item to the queue
50  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
51  *      berrno be;
52  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
53  *   }
54  *
55  *  Terminate the queue
56  *  workq_destroy(workq_t *wq);
57  *
58  */
59
60 #include "bacula.h"
61 #include "jcr.h"
62
63 /* Forward referenced functions */
64 extern "C" void *workq_server(void *arg);
65
66 /*
67  * Initialize a work queue
68  *
69  *  Returns: 0 on success
70  *           errno on failure
71  */
72 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
73 {
74    int stat;
75
76    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
77       return stat;
78    }
79    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
80       pthread_attr_destroy(&wq->attr);
81       return stat;
82    }
83    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
84       pthread_attr_destroy(&wq->attr);
85       return stat;
86    }
87    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
88       pthread_mutex_destroy(&wq->mutex);
89       pthread_attr_destroy(&wq->attr);
90       return stat;
91    }
92    wq->quit = 0;
93    wq->first = wq->last = NULL;
94    wq->max_workers = threads;         /* max threads to create */
95    wq->num_workers = 0;               /* no threads yet */
96    wq->idle_workers = 0;              /* no idle threads */
97    wq->engine = engine;               /* routine to run */
98    wq->valid = WORKQ_VALID;
99    return 0;
100 }
101
102 /*
103  * Destroy a work queue
104  *
105  * Returns: 0 on success
106  *          errno on failure
107  */
108 int workq_destroy(workq_t *wq)
109 {
110    int stat, stat1, stat2;
111
112   if (wq->valid != WORKQ_VALID) {
113      return EINVAL;
114   }
115   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
116      return stat;
117   }
118   wq->valid = 0;                      /* prevent any more operations */
119
120   /*
121    * If any threads are active, wake them
122    */
123   if (wq->num_workers > 0) {
124      wq->quit = 1;
125      if (wq->idle_workers) {
126         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
127            pthread_mutex_unlock(&wq->mutex);
128            return stat;
129         }
130      }
131      while (wq->num_workers > 0) {
132         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
133            pthread_mutex_unlock(&wq->mutex);
134            return stat;
135         }
136      }
137   }
138   if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
139      return stat;
140   }
141   stat  = pthread_mutex_destroy(&wq->mutex);
142   stat1 = pthread_cond_destroy(&wq->work);
143   stat2 = pthread_attr_destroy(&wq->attr);
144   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
145 }
146
147
148 /*
149  *  Add work to a queue
150  *    wq is a queue that was created with workq_init
151  *    element is a user unique item that will be passed to the
152  *        processing routine
153  *    work_item will get internal work queue item -- if it is not NULL
154  *    priority if non-zero will cause the item to be placed on the
155  *        head of the list instead of the tail.
156  */
157 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
158 {
159    int stat;
160    workq_ele_t *item;
161    pthread_t id;
162
163    Dmsg0(1400, "workq_add\n");
164    if (wq->valid != WORKQ_VALID) {
165       return EINVAL;
166    }
167
168    if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
169       return ENOMEM;
170    }
171    item->data = element;
172    item->next = NULL;
173    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
174       free(item);
175       return stat;
176    }
177
178    Dmsg0(1400, "add item to queue\n");
179    if (priority) {
180       /* Add to head of queue */
181       if (wq->first == NULL) {
182          wq->first = item;
183          wq->last = item;
184       } else {
185          item->next = wq->first;
186          wq->first = item;
187       }
188    } else {
189       /* Add to end of queue */
190       if (wq->first == NULL) {
191          wq->first = item;
192       } else {
193          wq->last->next = item;
194       }
195       wq->last = item;
196    }
197
198    /* if any threads are idle, wake one */
199    if (wq->idle_workers > 0) {
200       Dmsg0(1400, "Signal worker\n");
201       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
202          pthread_mutex_unlock(&wq->mutex);
203          return stat;
204       }
205    } else if (wq->num_workers < wq->max_workers) {
206       Dmsg0(1400, "Create worker thread\n");
207       /* No idle threads so create a new one */
208       set_thread_concurrency(wq->max_workers + 1);
209       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
210          pthread_mutex_unlock(&wq->mutex);
211          return stat;
212       }
213       wq->num_workers++;
214    }
215    pthread_mutex_unlock(&wq->mutex);
216    Dmsg0(1400, "Return workq_add\n");
217    /* Return work_item if requested */
218    if (work_item) {
219       *work_item = item;
220    }
221    return stat;
222 }
223
224 /*
225  *  Remove work from a queue
226  *    wq is a queue that was created with workq_init
227  *    work_item is an element of work
228  *
229  *   Note, it is "removed" by immediately calling a processing routine.
230  *    if you want to cancel it, you need to provide some external means
231  *    of doing so.
232  */
233 int workq_remove(workq_t *wq, workq_ele_t *work_item)
234 {
235    int stat, found = 0;
236    pthread_t id;
237    workq_ele_t *item, *prev;
238
239    Dmsg0(1400, "workq_remove\n");
240    if (wq->valid != WORKQ_VALID) {
241       return EINVAL;
242    }
243
244    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
245       return stat;
246    }
247
248    for (prev=item=wq->first; item; item=item->next) {
249       if (item == work_item) {
250          found = 1;
251          break;
252       }
253       prev = item;
254    }
255    if (!found) {
256       return EINVAL;
257    }
258
259    /* Move item to be first on list */
260    if (wq->first != work_item) {
261       prev->next = work_item->next;
262       if (wq->last == work_item) {
263          wq->last = prev;
264       }
265       work_item->next = wq->first;
266       wq->first = work_item;
267    }
268
269    /* if any threads are idle, wake one */
270    if (wq->idle_workers > 0) {
271       Dmsg0(1400, "Signal worker\n");
272       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
273          pthread_mutex_unlock(&wq->mutex);
274          return stat;
275       }
276    } else {
277       Dmsg0(1400, "Create worker thread\n");
278       /* No idle threads so create a new one */
279       set_thread_concurrency(wq->max_workers + 1);
280       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
281          pthread_mutex_unlock(&wq->mutex);
282          return stat;
283       }
284       wq->num_workers++;
285    }
286    pthread_mutex_unlock(&wq->mutex);
287    Dmsg0(1400, "Return workq_remove\n");
288    return stat;
289 }
290
291
292 /*
293  * This is the worker thread that serves the work queue.
294  * In due course, it will call the user's engine.
295  */
296 extern "C"
297 void *workq_server(void *arg)
298 {
299    struct timespec timeout;
300    workq_t *wq = (workq_t *)arg;
301    workq_ele_t *we;
302    int stat, timedout;
303
304    Dmsg0(1400, "Start workq_server\n");
305    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
306       return NULL;
307    }
308    set_jcr_in_tsd(INVALID_JCR);
309
310    for (;;) {
311       struct timeval tv;
312       struct timezone tz;
313
314       Dmsg0(1400, "Top of for loop\n");
315       timedout = 0;
316       Dmsg0(1400, "gettimeofday()\n");
317       gettimeofday(&tv, &tz);
318       timeout.tv_nsec = 0;
319       timeout.tv_sec = tv.tv_sec + 2;
320
321       while (wq->first == NULL && !wq->quit) {
322          /*
323           * Wait 2 seconds, then if no more work, exit
324           */
325          Dmsg0(1400, "pthread_cond_timedwait()\n");
326 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
327          /* CYGWIN dies with a page fault the second
328           * time that pthread_cond_timedwait() is called
329           * so fake it out.
330           */
331          pthread_mutex_lock(&wq->mutex);
332          stat = ETIMEDOUT;
333 #else
334          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
335 #endif
336          Dmsg1(1400, "timedwait=%d\n", stat);
337          if (stat == ETIMEDOUT) {
338             timedout = 1;
339             break;
340          } else if (stat != 0) {
341             /* This shouldn't happen */
342             Dmsg0(1400, "This shouldn't happen\n");
343             wq->num_workers--;
344             pthread_mutex_unlock(&wq->mutex);
345             return NULL;
346          }
347       }
348       we = wq->first;
349       if (we != NULL) {
350          wq->first = we->next;
351          if (wq->last == we) {
352             wq->last = NULL;
353          }
354          if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
355             return NULL;
356          }
357          /* Call user's routine here */
358          Dmsg0(1400, "Calling user engine.\n");
359          wq->engine(we->data);
360          Dmsg0(1400, "Back from user engine.\n");
361          free(we);                    /* release work entry */
362          Dmsg0(1400, "relock mutex\n");
363          if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
364             return NULL;
365          }
366          Dmsg0(1400, "Done lock mutex\n");
367       }
368       /*
369        * If no more work request, and we are asked to quit, then do it
370        */
371       if (wq->first == NULL && wq->quit) {
372          wq->num_workers--;
373          if (wq->num_workers == 0) {
374             Dmsg0(1400, "Wake up destroy routine\n");
375             /* Wake up destroy routine if he is waiting */
376             pthread_cond_broadcast(&wq->work);
377          }
378          Dmsg0(1400, "Unlock mutex\n");
379          pthread_mutex_unlock(&wq->mutex);
380          Dmsg0(1400, "Return from workq_server\n");
381          return NULL;
382       }
383       Dmsg0(1400, "Check for work request\n");
384       /*
385        * If no more work requests, and we waited long enough, quit
386        */
387       Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
388       Dmsg1(1400, "timedout=%d\n", timedout);
389       if (wq->first == NULL && timedout) {
390          Dmsg0(1400, "break big loop\n");
391          wq->num_workers--;
392          break;
393       }
394       Dmsg0(1400, "Loop again\n");
395    } /* end of big for loop */
396
397    Dmsg0(1400, "unlock mutex\n");
398    pthread_mutex_unlock(&wq->mutex);
399    Dmsg0(1400, "End workq_server\n");
400    return NULL;
401 }