]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
kes Fix sscanf problems reported by Peter Buschman that caused
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation plus additions
11    that are listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula work queue routines. Permits passing work to
30  *  multiple threads.
31  *
32  *  Kern Sibbald, January MMI
33  *
34  *   Version $Id$
35  *
36  *  This code adapted from "Programming with POSIX Threads", by
37  *    David R. Butenhof
38  *
39  * Example:
40  *
41  * static workq_t job_wq;    define work queue
42  *
43  *  Initialize queue
44  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
45  *     berrno be;
46  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
47  *   }
48  *
49  *  Add an item to the queue
50  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
51  *      berrno be;
52  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
53  *   }
54  *
55  *  Terminate the queue
56  *  workq_destroy(workq_t *wq);
57  *
58  */
59
60 #include "bacula.h"
61
62 /* Forward referenced functions */
63 extern "C" void *workq_server(void *arg);
64
65 /*
66  * Initialize a work queue
67  *
68  *  Returns: 0 on success
69  *           errno on failure
70  */
71 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
72 {
73    int stat;
74
75    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
76       return stat;
77    }
78    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
79       pthread_attr_destroy(&wq->attr);
80       return stat;
81    }
82    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
83       pthread_attr_destroy(&wq->attr);
84       return stat;
85    }
86    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
87       pthread_mutex_destroy(&wq->mutex);
88       pthread_attr_destroy(&wq->attr);
89       return stat;
90    }
91    wq->quit = 0;
92    wq->first = wq->last = NULL;
93    wq->max_workers = threads;         /* max threads to create */
94    wq->num_workers = 0;               /* no threads yet */
95    wq->idle_workers = 0;              /* no idle threads */
96    wq->engine = engine;               /* routine to run */
97    wq->valid = WORKQ_VALID;
98    return 0;
99 }
100
101 /*
102  * Destroy a work queue
103  *
104  * Returns: 0 on success
105  *          errno on failure
106  */
107 int workq_destroy(workq_t *wq)
108 {
109    int stat, stat1, stat2;
110
111   if (wq->valid != WORKQ_VALID) {
112      return EINVAL;
113   }
114   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
115      return stat;
116   }
117   wq->valid = 0;                      /* prevent any more operations */
118
119   /*
120    * If any threads are active, wake them
121    */
122   if (wq->num_workers > 0) {
123      wq->quit = 1;
124      if (wq->idle_workers) {
125         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
126            pthread_mutex_unlock(&wq->mutex);
127            return stat;
128         }
129      }
130      while (wq->num_workers > 0) {
131         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
132            pthread_mutex_unlock(&wq->mutex);
133            return stat;
134         }
135      }
136   }
137   if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
138      return stat;
139   }
140   stat  = pthread_mutex_destroy(&wq->mutex);
141   stat1 = pthread_cond_destroy(&wq->work);
142   stat2 = pthread_attr_destroy(&wq->attr);
143   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
144 }
145
146
147 /*
148  *  Add work to a queue
149  *    wq is a queue that was created with workq_init
150  *    element is a user unique item that will be passed to the
151  *        processing routine
152  *    work_item will get internal work queue item -- if it is not NULL
153  *    priority if non-zero will cause the item to be placed on the
154  *        head of the list instead of the tail.
155  */
156 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
157 {
158    int stat;
159    workq_ele_t *item;
160    pthread_t id;
161
162    Dmsg0(1400, "workq_add\n");
163    if (wq->valid != WORKQ_VALID) {
164       return EINVAL;
165    }
166
167    if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
168       return ENOMEM;
169    }
170    item->data = element;
171    item->next = NULL;
172    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
173       free(item);
174       return stat;
175    }
176
177    Dmsg0(1400, "add item to queue\n");
178    if (priority) {
179       /* Add to head of queue */
180       if (wq->first == NULL) {
181          wq->first = item;
182          wq->last = item;
183       } else {
184          item->next = wq->first;
185          wq->first = item;
186       }
187    } else {
188       /* Add to end of queue */
189       if (wq->first == NULL) {
190          wq->first = item;
191       } else {
192          wq->last->next = item;
193       }
194       wq->last = item;
195    }
196
197    /* if any threads are idle, wake one */
198    if (wq->idle_workers > 0) {
199       Dmsg0(1400, "Signal worker\n");
200       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
201          pthread_mutex_unlock(&wq->mutex);
202          return stat;
203       }
204    } else if (wq->num_workers < wq->max_workers) {
205       Dmsg0(1400, "Create worker thread\n");
206       /* No idle threads so create a new one */
207       set_thread_concurrency(wq->max_workers + 1);
208       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
209          pthread_mutex_unlock(&wq->mutex);
210          return stat;
211       }
212       wq->num_workers++;
213    }
214    pthread_mutex_unlock(&wq->mutex);
215    Dmsg0(1400, "Return workq_add\n");
216    /* Return work_item if requested */
217    if (work_item) {
218       *work_item = item;
219    }
220    return stat;
221 }
222
223 /*
224  *  Remove work from a queue
225  *    wq is a queue that was created with workq_init
226  *    work_item is an element of work
227  *
228  *   Note, it is "removed" by immediately calling a processing routine.
229  *    if you want to cancel it, you need to provide some external means
230  *    of doing so.
231  */
232 int workq_remove(workq_t *wq, workq_ele_t *work_item)
233 {
234    int stat, found = 0;
235    pthread_t id;
236    workq_ele_t *item, *prev;
237
238    Dmsg0(1400, "workq_remove\n");
239    if (wq->valid != WORKQ_VALID) {
240       return EINVAL;
241    }
242
243    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
244       return stat;
245    }
246
247    for (prev=item=wq->first; item; item=item->next) {
248       if (item == work_item) {
249          found = 1;
250          break;
251       }
252       prev = item;
253    }
254    if (!found) {
255       return EINVAL;
256    }
257
258    /* Move item to be first on list */
259    if (wq->first != work_item) {
260       prev->next = work_item->next;
261       if (wq->last == work_item) {
262          wq->last = prev;
263       }
264       work_item->next = wq->first;
265       wq->first = work_item;
266    }
267
268    /* if any threads are idle, wake one */
269    if (wq->idle_workers > 0) {
270       Dmsg0(1400, "Signal worker\n");
271       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
272          pthread_mutex_unlock(&wq->mutex);
273          return stat;
274       }
275    } else {
276       Dmsg0(1400, "Create worker thread\n");
277       /* No idle threads so create a new one */
278       set_thread_concurrency(wq->max_workers + 1);
279       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
280          pthread_mutex_unlock(&wq->mutex);
281          return stat;
282       }
283       wq->num_workers++;
284    }
285    pthread_mutex_unlock(&wq->mutex);
286    Dmsg0(1400, "Return workq_remove\n");
287    return stat;
288 }
289
290
291 /*
292  * This is the worker thread that serves the work queue.
293  * In due course, it will call the user's engine.
294  */
295 extern "C"
296 void *workq_server(void *arg)
297 {
298    struct timespec timeout;
299    workq_t *wq = (workq_t *)arg;
300    workq_ele_t *we;
301    int stat, timedout;
302
303    Dmsg0(1400, "Start workq_server\n");
304    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
305       return NULL;
306    }
307
308    for (;;) {
309       struct timeval tv;
310       struct timezone tz;
311
312       Dmsg0(1400, "Top of for loop\n");
313       timedout = 0;
314       Dmsg0(1400, "gettimeofday()\n");
315       gettimeofday(&tv, &tz);
316       timeout.tv_nsec = 0;
317       timeout.tv_sec = tv.tv_sec + 2;
318
319       while (wq->first == NULL && !wq->quit) {
320          /*
321           * Wait 2 seconds, then if no more work, exit
322           */
323          Dmsg0(1400, "pthread_cond_timedwait()\n");
324 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
325          /* CYGWIN dies with a page fault the second
326           * time that pthread_cond_timedwait() is called
327           * so fake it out.
328           */
329          pthread_mutex_lock(&wq->mutex);
330          stat = ETIMEDOUT;
331 #else
332          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
333 #endif
334          Dmsg1(1400, "timedwait=%d\n", stat);
335          if (stat == ETIMEDOUT) {
336             timedout = 1;
337             break;
338          } else if (stat != 0) {
339             /* This shouldn't happen */
340             Dmsg0(1400, "This shouldn't happen\n");
341             wq->num_workers--;
342             pthread_mutex_unlock(&wq->mutex);
343             return NULL;
344          }
345       }
346       we = wq->first;
347       if (we != NULL) {
348          wq->first = we->next;
349          if (wq->last == we) {
350             wq->last = NULL;
351          }
352          if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
353             return NULL;
354          }
355          /* Call user's routine here */
356          Dmsg0(1400, "Calling user engine.\n");
357          wq->engine(we->data);
358          Dmsg0(1400, "Back from user engine.\n");
359          free(we);                    /* release work entry */
360          Dmsg0(1400, "relock mutex\n");
361          if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
362             return NULL;
363          }
364          Dmsg0(1400, "Done lock mutex\n");
365       }
366       /*
367        * If no more work request, and we are asked to quit, then do it
368        */
369       if (wq->first == NULL && wq->quit) {
370          wq->num_workers--;
371          if (wq->num_workers == 0) {
372             Dmsg0(1400, "Wake up destroy routine\n");
373             /* Wake up destroy routine if he is waiting */
374             pthread_cond_broadcast(&wq->work);
375          }
376          Dmsg0(1400, "Unlock mutex\n");
377          pthread_mutex_unlock(&wq->mutex);
378          Dmsg0(1400, "Return from workq_server\n");
379          return NULL;
380       }
381       Dmsg0(1400, "Check for work request\n");
382       /*
383        * If no more work requests, and we waited long enough, quit
384        */
385       Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
386       Dmsg1(1400, "timedout=%d\n", timedout);
387       if (wq->first == NULL && timedout) {
388          Dmsg0(1400, "break big loop\n");
389          wq->num_workers--;
390          break;
391       }
392       Dmsg0(1400, "Loop again\n");
393    } /* end of big for loop */
394
395    Dmsg0(1400, "unlock mutex\n");
396    pthread_mutex_unlock(&wq->mutex);
397    Dmsg0(1400, "End workq_server\n");
398    return NULL;
399 }