]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
kes Prepare to add JS_Warnings termination status.
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2001-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula work queue routines. Permits passing work to
30  *  multiple threads.
31  *
32  *  Kern Sibbald, January MMI
33  *
34  *   Version $Id$
35  *
36  *  This code adapted from "Programming with POSIX Threads", by
37  *    David R. Butenhof
38  *
39  * Example:
40  *
41  * static workq_t job_wq;    define work queue
42  *
43  *  Initialize queue
44  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
45  *     berrno be;
46  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
47  *   }
48  *
49  *  Add an item to the queue
50  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
51  *      berrno be;
52  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
53  *   }
54  *
55  *  Terminate the queue
56  *  workq_destroy(workq_t *wq);
57  *
58  */
59
60 #include "bacula.h"
61 #include "jcr.h"
62
63 /* Forward referenced functions */
64 extern "C" void *workq_server(void *arg);
65
66 /*
67  * Initialize a work queue
68  *
69  *  Returns: 0 on success
70  *           errno on failure
71  */
72 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
73 {
74    int stat;
75
76    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
77       return stat;
78    }
79    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
80       pthread_attr_destroy(&wq->attr);
81       return stat;
82    }
83    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
84       pthread_attr_destroy(&wq->attr);
85       return stat;
86    }
87    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
88       pthread_mutex_destroy(&wq->mutex);
89       pthread_attr_destroy(&wq->attr);
90       return stat;
91    }
92    wq->quit = 0;
93    wq->first = wq->last = NULL;
94    wq->max_workers = threads;         /* max threads to create */
95    wq->num_workers = 0;               /* no threads yet */
96    wq->idle_workers = 0;              /* no idle threads */
97    wq->engine = engine;               /* routine to run */
98    wq->valid = WORKQ_VALID;
99    return 0;
100 }
101
102 /*
103  * Destroy a work queue
104  *
105  * Returns: 0 on success
106  *          errno on failure
107  */
108 int workq_destroy(workq_t *wq)
109 {
110    int stat, stat1, stat2;
111
112   if (wq->valid != WORKQ_VALID) {
113      return EINVAL;
114   }
115   P(wq->mutex);
116   wq->valid = 0;                      /* prevent any more operations */
117
118   /*
119    * If any threads are active, wake them
120    */
121   if (wq->num_workers > 0) {
122      wq->quit = 1;
123      if (wq->idle_workers) {
124         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
125            V(wq->mutex);
126            return stat;
127         }
128      }
129      while (wq->num_workers > 0) {
130         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
131            V(wq->mutex);
132            return stat;
133         }
134      }
135   }
136   V(wq->mutex);
137   stat  = pthread_mutex_destroy(&wq->mutex);
138   stat1 = pthread_cond_destroy(&wq->work);
139   stat2 = pthread_attr_destroy(&wq->attr);
140   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
141 }
142
143
144 /*
145  *  Add work to a queue
146  *    wq is a queue that was created with workq_init
147  *    element is a user unique item that will be passed to the
148  *        processing routine
149  *    work_item will get internal work queue item -- if it is not NULL
150  *    priority if non-zero will cause the item to be placed on the
151  *        head of the list instead of the tail.
152  */
153 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
154 {
155    int stat=0;
156    workq_ele_t *item;
157    pthread_t id;
158
159    Dmsg0(1400, "workq_add\n");
160    if (wq->valid != WORKQ_VALID) {
161       return EINVAL;
162    }
163
164    if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
165       return ENOMEM;
166    }
167    item->data = element;
168    item->next = NULL;
169    P(wq->mutex);
170
171    Dmsg0(1400, "add item to queue\n");
172    if (priority) {
173       /* Add to head of queue */
174       if (wq->first == NULL) {
175          wq->first = item;
176          wq->last = item;
177       } else {
178          item->next = wq->first;
179          wq->first = item;
180       }
181    } else {
182       /* Add to end of queue */
183       if (wq->first == NULL) {
184          wq->first = item;
185       } else {
186          wq->last->next = item;
187       }
188       wq->last = item;
189    }
190
191    /* if any threads are idle, wake one */
192    if (wq->idle_workers > 0) {
193       Dmsg0(1400, "Signal worker\n");
194       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
195          V(wq->mutex);
196          return stat;
197       }
198    } else if (wq->num_workers < wq->max_workers) {
199       Dmsg0(1400, "Create worker thread\n");
200       /* No idle threads so create a new one */
201       set_thread_concurrency(wq->max_workers + 1);
202       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
203          V(wq->mutex);
204          return stat;
205       }
206       wq->num_workers++;
207    }
208    V(wq->mutex);
209    Dmsg0(1400, "Return workq_add\n");
210    /* Return work_item if requested */
211    if (work_item) {
212       *work_item = item;
213    }
214    return stat;
215 }
216
217 /*
218  *  Remove work from a queue
219  *    wq is a queue that was created with workq_init
220  *    work_item is an element of work
221  *
222  *   Note, it is "removed" by immediately calling a processing routine.
223  *    if you want to cancel it, you need to provide some external means
224  *    of doing so.
225  */
226 int workq_remove(workq_t *wq, workq_ele_t *work_item)
227 {
228    int stat, found = 0;
229    pthread_t id;
230    workq_ele_t *item, *prev;
231
232    Dmsg0(1400, "workq_remove\n");
233    if (wq->valid != WORKQ_VALID) {
234       return EINVAL;
235    }
236
237    P(wq->mutex);
238
239    for (prev=item=wq->first; item; item=item->next) {
240       if (item == work_item) {
241          found = 1;
242          break;
243       }
244       prev = item;
245    }
246    if (!found) {
247       return EINVAL;
248    }
249
250    /* Move item to be first on list */
251    if (wq->first != work_item) {
252       prev->next = work_item->next;
253       if (wq->last == work_item) {
254          wq->last = prev;
255       }
256       work_item->next = wq->first;
257       wq->first = work_item;
258    }
259
260    /* if any threads are idle, wake one */
261    if (wq->idle_workers > 0) {
262       Dmsg0(1400, "Signal worker\n");
263       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
264          V(wq->mutex);
265          return stat;
266       }
267    } else {
268       Dmsg0(1400, "Create worker thread\n");
269       /* No idle threads so create a new one */
270       set_thread_concurrency(wq->max_workers + 1);
271       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
272          V(wq->mutex);
273          return stat;
274       }
275       wq->num_workers++;
276    }
277    V(wq->mutex);
278    Dmsg0(1400, "Return workq_remove\n");
279    return stat;
280 }
281
282
283 /*
284  * This is the worker thread that serves the work queue.
285  * In due course, it will call the user's engine.
286  */
287 extern "C"
288 void *workq_server(void *arg)
289 {
290    struct timespec timeout;
291    workq_t *wq = (workq_t *)arg;
292    workq_ele_t *we;
293    int stat, timedout;
294
295    Dmsg0(1400, "Start workq_server\n");
296    P(wq->mutex);
297    set_jcr_in_tsd(INVALID_JCR);
298
299    for (;;) {
300       struct timeval tv;
301       struct timezone tz;
302
303       Dmsg0(1400, "Top of for loop\n");
304       timedout = 0;
305       Dmsg0(1400, "gettimeofday()\n");
306       gettimeofday(&tv, &tz);
307       timeout.tv_nsec = 0;
308       timeout.tv_sec = tv.tv_sec + 2;
309
310       while (wq->first == NULL && !wq->quit) {
311          /*
312           * Wait 2 seconds, then if no more work, exit
313           */
314          Dmsg0(1400, "pthread_cond_timedwait()\n");
315 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
316          /* CYGWIN dies with a page fault the second
317           * time that pthread_cond_timedwait() is called
318           * so fake it out.
319           */
320          P(wq->mutex);
321          stat = ETIMEDOUT;
322 #else
323          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
324 #endif
325          Dmsg1(1400, "timedwait=%d\n", stat);
326          if (stat == ETIMEDOUT) {
327             timedout = 1;
328             break;
329          } else if (stat != 0) {
330             /* This shouldn't happen */
331             Dmsg0(1400, "This shouldn't happen\n");
332             wq->num_workers--;
333             V(wq->mutex);
334             return NULL;
335          }
336       }
337       we = wq->first;
338       if (we != NULL) {
339          wq->first = we->next;
340          if (wq->last == we) {
341             wq->last = NULL;
342          }
343          V(wq->mutex);
344          /* Call user's routine here */
345          Dmsg0(1400, "Calling user engine.\n");
346          wq->engine(we->data);
347          Dmsg0(1400, "Back from user engine.\n");
348          free(we);                    /* release work entry */
349          Dmsg0(1400, "relock mutex\n");
350          P(wq->mutex);
351          Dmsg0(1400, "Done lock mutex\n");
352       }
353       /*
354        * If no more work request, and we are asked to quit, then do it
355        */
356       if (wq->first == NULL && wq->quit) {
357          wq->num_workers--;
358          if (wq->num_workers == 0) {
359             Dmsg0(1400, "Wake up destroy routine\n");
360             /* Wake up destroy routine if he is waiting */
361             pthread_cond_broadcast(&wq->work);
362          }
363          Dmsg0(1400, "Unlock mutex\n");
364          V(wq->mutex);
365          Dmsg0(1400, "Return from workq_server\n");
366          return NULL;
367       }
368       Dmsg0(1400, "Check for work request\n");
369       /*
370        * If no more work requests, and we waited long enough, quit
371        */
372       Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
373       Dmsg1(1400, "timedout=%d\n", timedout);
374       if (wq->first == NULL && timedout) {
375          Dmsg0(1400, "break big loop\n");
376          wq->num_workers--;
377          break;
378       }
379       Dmsg0(1400, "Loop again\n");
380    } /* end of big for loop */
381
382    Dmsg0(1400, "unlock mutex\n");
383    V(wq->mutex);
384    Dmsg0(1400, "End workq_server\n");
385    return NULL;
386 }