]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
Update copyright
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2  * Bacula work queue routines. Permits passing work to
3  *  multiple threads.
4  *
5  *  Kern Sibbald, January MMI
6  *
7  *   Version $Id$
8  *
9  *  This code adapted from "Programming with POSIX Threads", by
10  *    David R. Butenhof
11  *
12  * Example:
13  *
14  * static workq_t job_wq;    define work queue
15  *
16  *  Initialize queue
17  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
18  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
19  *   }
20  *
21  *  Add an item to the queue
22  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
23  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno));
24  *   }
25  *
26  *  Terminate the queue
27  *  workq_destroy(workq_t *wq);
28  *
29  */
30 /*
31    Bacula® - The Network Backup Solution
32
33    Copyright (C) 2000-2006 Free Software Foundation Europe e.V.
34
35    The main author of Bacula is Kern Sibbald, with contributions from
36    many others, a complete list can be found in the file AUTHORS.
37    This program is Free Software; you can redistribute it and/or
38    modify it under the terms of version two of the GNU General Public
39    License as published by the Free Software Foundation plus additions
40    that are listed in the file LICENSE.
41
42    This program is distributed in the hope that it will be useful, but
43    WITHOUT ANY WARRANTY; without even the implied warranty of
44    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
45    General Public License for more details.
46
47    You should have received a copy of the GNU General Public License
48    along with this program; if not, write to the Free Software
49    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
50    02110-1301, USA.
51
52    Bacula® is a registered trademark of John Walker.
53    The licensor of Bacula is the Free Software Foundation Europe
54    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
55    Switzerland, email:ftf@fsfeurope.org.
56 */
57
58 #include "bacula.h"
59
60 /* Forward referenced functions */
61 extern "C" void *workq_server(void *arg);
62
63 /*
64  * Initialize a work queue
65  *
66  *  Returns: 0 on success
67  *           errno on failure
68  */
69 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
70 {
71    int stat;
72
73    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
74       return stat;
75    }
76    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
77       pthread_attr_destroy(&wq->attr);
78       return stat;
79    }
80    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
81       pthread_attr_destroy(&wq->attr);
82       return stat;
83    }
84    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
85       pthread_mutex_destroy(&wq->mutex);
86       pthread_attr_destroy(&wq->attr);
87       return stat;
88    }
89    wq->quit = 0;
90    wq->first = wq->last = NULL;
91    wq->max_workers = threads;         /* max threads to create */
92    wq->num_workers = 0;               /* no threads yet */
93    wq->idle_workers = 0;              /* no idle threads */
94    wq->engine = engine;               /* routine to run */
95    wq->valid = WORKQ_VALID;
96    return 0;
97 }
98
99 /*
100  * Destroy a work queue
101  *
102  * Returns: 0 on success
103  *          errno on failure
104  */
105 int workq_destroy(workq_t *wq)
106 {
107    int stat, stat1, stat2;
108
109   if (wq->valid != WORKQ_VALID) {
110      return EINVAL;
111   }
112   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
113      return stat;
114   }
115   wq->valid = 0;                      /* prevent any more operations */
116
117   /*
118    * If any threads are active, wake them
119    */
120   if (wq->num_workers > 0) {
121      wq->quit = 1;
122      if (wq->idle_workers) {
123         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
124            pthread_mutex_unlock(&wq->mutex);
125            return stat;
126         }
127      }
128      while (wq->num_workers > 0) {
129         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
130            pthread_mutex_unlock(&wq->mutex);
131            return stat;
132         }
133      }
134   }
135   if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
136      return stat;
137   }
138   stat  = pthread_mutex_destroy(&wq->mutex);
139   stat1 = pthread_cond_destroy(&wq->work);
140   stat2 = pthread_attr_destroy(&wq->attr);
141   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
142 }
143
144
145 /*
146  *  Add work to a queue
147  *    wq is a queue that was created with workq_init
148  *    element is a user unique item that will be passed to the
149  *        processing routine
150  *    work_item will get internal work queue item -- if it is not NULL
151  *    priority if non-zero will cause the item to be placed on the
152  *        head of the list instead of the tail.
153  */
154 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
155 {
156    int stat;
157    workq_ele_t *item;
158    pthread_t id;
159
160    Dmsg0(1400, "workq_add\n");
161    if (wq->valid != WORKQ_VALID) {
162       return EINVAL;
163    }
164
165    if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
166       return ENOMEM;
167    }
168    item->data = element;
169    item->next = NULL;
170    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
171       free(item);
172       return stat;
173    }
174
175    Dmsg0(1400, "add item to queue\n");
176    if (priority) {
177       /* Add to head of queue */
178       if (wq->first == NULL) {
179          wq->first = item;
180          wq->last = item;
181       } else {
182          item->next = wq->first;
183          wq->first = item;
184       }
185    } else {
186       /* Add to end of queue */
187       if (wq->first == NULL) {
188          wq->first = item;
189       } else {
190          wq->last->next = item;
191       }
192       wq->last = item;
193    }
194
195    /* if any threads are idle, wake one */
196    if (wq->idle_workers > 0) {
197       Dmsg0(1400, "Signal worker\n");
198       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
199          pthread_mutex_unlock(&wq->mutex);
200          return stat;
201       }
202    } else if (wq->num_workers < wq->max_workers) {
203       Dmsg0(1400, "Create worker thread\n");
204       /* No idle threads so create a new one */
205       set_thread_concurrency(wq->max_workers + 1);
206       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
207          pthread_mutex_unlock(&wq->mutex);
208          return stat;
209       }
210       wq->num_workers++;
211    }
212    pthread_mutex_unlock(&wq->mutex);
213    Dmsg0(1400, "Return workq_add\n");
214    /* Return work_item if requested */
215    if (work_item) {
216       *work_item = item;
217    }
218    return stat;
219 }
220
221 /*
222  *  Remove work from a queue
223  *    wq is a queue that was created with workq_init
224  *    work_item is an element of work
225  *
226  *   Note, it is "removed" by immediately calling a processing routine.
227  *    if you want to cancel it, you need to provide some external means
228  *    of doing so.
229  */
230 int workq_remove(workq_t *wq, workq_ele_t *work_item)
231 {
232    int stat, found = 0;
233    pthread_t id;
234    workq_ele_t *item, *prev;
235
236    Dmsg0(1400, "workq_remove\n");
237    if (wq->valid != WORKQ_VALID) {
238       return EINVAL;
239    }
240
241    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
242       return stat;
243    }
244
245    for (prev=item=wq->first; item; item=item->next) {
246       if (item == work_item) {
247          found = 1;
248          break;
249       }
250       prev = item;
251    }
252    if (!found) {
253       return EINVAL;
254    }
255
256    /* Move item to be first on list */
257    if (wq->first != work_item) {
258       prev->next = work_item->next;
259       if (wq->last == work_item) {
260          wq->last = prev;
261       }
262       work_item->next = wq->first;
263       wq->first = work_item;
264    }
265
266    /* if any threads are idle, wake one */
267    if (wq->idle_workers > 0) {
268       Dmsg0(1400, "Signal worker\n");
269       if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
270          pthread_mutex_unlock(&wq->mutex);
271          return stat;
272       }
273    } else {
274       Dmsg0(1400, "Create worker thread\n");
275       /* No idle threads so create a new one */
276       set_thread_concurrency(wq->max_workers + 1);
277       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
278          pthread_mutex_unlock(&wq->mutex);
279          return stat;
280       }
281       wq->num_workers++;
282    }
283    pthread_mutex_unlock(&wq->mutex);
284    Dmsg0(1400, "Return workq_remove\n");
285    return stat;
286 }
287
288
289 /*
290  * This is the worker thread that serves the work queue.
291  * In due course, it will call the user's engine.
292  */
293 extern "C"
294 void *workq_server(void *arg)
295 {
296    struct timespec timeout;
297    workq_t *wq = (workq_t *)arg;
298    workq_ele_t *we;
299    int stat, timedout;
300
301    Dmsg0(1400, "Start workq_server\n");
302    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
303       return NULL;
304    }
305
306    for (;;) {
307       struct timeval tv;
308       struct timezone tz;
309
310       Dmsg0(1400, "Top of for loop\n");
311       timedout = 0;
312       Dmsg0(1400, "gettimeofday()\n");
313       gettimeofday(&tv, &tz);
314       timeout.tv_nsec = 0;
315       timeout.tv_sec = tv.tv_sec + 2;
316
317       while (wq->first == NULL && !wq->quit) {
318          /*
319           * Wait 2 seconds, then if no more work, exit
320           */
321          Dmsg0(1400, "pthread_cond_timedwait()\n");
322 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
323          /* CYGWIN dies with a page fault the second
324           * time that pthread_cond_timedwait() is called
325           * so fake it out.
326           */
327          pthread_mutex_lock(&wq->mutex);
328          stat = ETIMEDOUT;
329 #else
330          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
331 #endif
332          Dmsg1(1400, "timedwait=%d\n", stat);
333          if (stat == ETIMEDOUT) {
334             timedout = 1;
335             break;
336          } else if (stat != 0) {
337             /* This shouldn't happen */
338             Dmsg0(1400, "This shouldn't happen\n");
339             wq->num_workers--;
340             pthread_mutex_unlock(&wq->mutex);
341             return NULL;
342          }
343       }
344       we = wq->first;
345       if (we != NULL) {
346          wq->first = we->next;
347          if (wq->last == we) {
348             wq->last = NULL;
349          }
350          if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
351             return NULL;
352          }
353          /* Call user's routine here */
354          Dmsg0(1400, "Calling user engine.\n");
355          wq->engine(we->data);
356          Dmsg0(1400, "Back from user engine.\n");
357          free(we);                    /* release work entry */
358          Dmsg0(1400, "relock mutex\n");
359          if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
360             return NULL;
361          }
362          Dmsg0(1400, "Done lock mutex\n");
363       }
364       /*
365        * If no more work request, and we are asked to quit, then do it
366        */
367       if (wq->first == NULL && wq->quit) {
368          wq->num_workers--;
369          if (wq->num_workers == 0) {
370             Dmsg0(1400, "Wake up destroy routine\n");
371             /* Wake up destroy routine if he is waiting */
372             pthread_cond_broadcast(&wq->work);
373          }
374          Dmsg0(1400, "Unlock mutex\n");
375          pthread_mutex_unlock(&wq->mutex);
376          Dmsg0(1400, "Return from workq_server\n");
377          return NULL;
378       }
379       Dmsg0(1400, "Check for work request\n");
380       /*
381        * If no more work requests, and we waited long enough, quit
382        */
383       Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL);
384       Dmsg1(1400, "timedout=%d\n", timedout);
385       if (wq->first == NULL && timedout) {
386          Dmsg0(1400, "break big loop\n");
387          wq->num_workers--;
388          break;
389       }
390       Dmsg0(1400, "Loop again\n");
391    } /* end of big for loop */
392
393    Dmsg0(1400, "unlock mutex\n");
394    pthread_mutex_unlock(&wq->mutex);
395    Dmsg0(1400, "End workq_server\n");
396    return NULL;
397 }