]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
Apply Preben 'Peppe' Guldberg <peppe@wielders.org>
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2  * Bacula work queue routines. Permits passing work to
3  *  multiple threads.
4  *
5  *  Kern Sibbald, January MMI
6  *
7  *   Version $Id$
8  *
9  *  This code adapted from "Programming with POSIX Threads", by
10  *    David R. Butenhof
11  *
12  * Example:
13  *
14  * static workq_t job_wq;    define work queue
15  *
16  *  Initialize queue
17  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
18  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
19  *   }
20  *
21  *  Add an item to the queue
22  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
23  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno));
24  *   }
25  *
26  *  Terminate the queue
27  *  workq_destroy(workq_t *wq);
28  *
29  */
30 /*
31    Copyright (C) 2000-2004 Kern Sibbald and John Walker
32
33    This program is free software; you can redistribute it and/or
34    modify it under the terms of the GNU General Public License as
35    published by the Free Software Foundation; either version 2 of
36    the License, or (at your option) any later version.
37
38    This program is distributed in the hope that it will be useful,
39    but WITHOUT ANY WARRANTY; without even the implied warranty of
40    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
41    General Public License for more details.
42
43    You should have received a copy of the GNU General Public
44    License along with this program; if not, write to the Free
45    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
46    MA 02111-1307, USA.
47
48  */
49
50 #include "bacula.h"
51
52 /* Forward referenced functions */
53 extern "C" void *workq_server(void *arg);
54
55 /*
56  * Initialize a work queue
57  *
58  *  Returns: 0 on success
59  *           errno on failure
60  */
61 int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg))
62 {
63    int stat;
64
65    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
66       return stat;
67    }
68    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69       pthread_attr_destroy(&wq->attr);
70       return stat;
71    }
72    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
73       pthread_attr_destroy(&wq->attr);
74       return stat;
75    }
76    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
77       pthread_mutex_destroy(&wq->mutex);
78       pthread_attr_destroy(&wq->attr);
79       return stat;
80    }
81    wq->quit = 0;
82    wq->first = wq->last = NULL;
83    wq->max_workers = threads;         /* max threads to create */
84    wq->num_workers = 0;               /* no threads yet */
85    wq->idle_workers = 0;              /* no idle threads */
86    wq->engine = engine;               /* routine to run */
87    wq->valid = WORKQ_VALID;
88    return 0;
89 }
90
91 /*
92  * Destroy a work queue
93  *
94  * Returns: 0 on success
95  *          errno on failure
96  */
97 int workq_destroy(workq_t *wq)
98 {
99    int stat, stat1, stat2;
100
101   if (wq->valid != WORKQ_VALID) {
102      return EINVAL;
103   }
104   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
105      return stat;
106   }
107   wq->valid = 0;                      /* prevent any more operations */
108
109   /*
110    * If any threads are active, wake them
111    */
112   if (wq->num_workers > 0) {
113      wq->quit = 1;
114      if (wq->idle_workers) {
115         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
116            pthread_mutex_unlock(&wq->mutex);
117            return stat;
118         }
119      }
120      while (wq->num_workers > 0) {
121         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
122            pthread_mutex_unlock(&wq->mutex);
123            return stat;
124         }
125      }
126   }
127   if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
128      return stat;
129   }
130   stat  = pthread_mutex_destroy(&wq->mutex);
131   stat1 = pthread_cond_destroy(&wq->work);
132   stat2 = pthread_attr_destroy(&wq->attr);
133   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
134 }
135
136
137 /*
138  *  Add work to a queue
139  *    wq is a queue that was created with workq_init
140  *    element is a user unique item that will be passed to the
141  *        processing routine
142  *    work_item will get internal work queue item -- if it is not NULL
143  *    priority if non-zero will cause the item to be placed on the
144  *        head of the list instead of the tail.
145  */
146 int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
147 {
148    int stat;
149    workq_ele_t *item;
150    pthread_t id;
151
152    Dmsg0(400, "workq_add\n");
153    if (wq->valid != WORKQ_VALID) {
154       return EINVAL;
155    }
156
157    if ((item = (workq_ele_t *)malloc(sizeof(workq_ele_t))) == NULL) {
158       return ENOMEM;
159    }
160    item->data = element;
161    item->next = NULL;
162    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
163       free(item);
164       return stat;
165    }
166
167    Dmsg0(400, "add item to queue\n");
168    if (priority) {
169       /* Add to head of queue */
170       if (wq->first == NULL) {
171          wq->first = item;
172          wq->last = item;
173       } else {
174          item->next = wq->first;
175          wq->first = item;
176       }
177    } else {
178       /* Add to end of queue */
179       if (wq->first == NULL) {
180          wq->first = item;
181       } else {
182          wq->last->next = item;
183       }
184       wq->last = item;
185    }
186
187    /* if any threads are idle, wake one */
188    if (wq->idle_workers > 0) {
189       Dmsg0(400, "Signal worker\n");
190       if ((stat = pthread_cond_signal(&wq->work)) != 0) {
191          pthread_mutex_unlock(&wq->mutex);
192          return stat;
193       }
194    } else if (wq->num_workers < wq->max_workers) {
195       Dmsg0(400, "Create worker thread\n");
196       /* No idle threads so create a new one */
197       set_thread_concurrency(wq->max_workers + 1);
198       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
199          pthread_mutex_unlock(&wq->mutex);
200          return stat;
201       }
202       wq->num_workers++;
203    }
204    pthread_mutex_unlock(&wq->mutex);
205    Dmsg0(400, "Return workq_add\n");
206    /* Return work_item if requested */
207    if (work_item) {
208       *work_item = item;
209    }
210    return stat;
211 }
212
213 /*
214  *  Remove work from a queue
215  *    wq is a queue that was created with workq_init
216  *    work_item is an element of work
217  *
218  *   Note, it is "removed" by immediately calling a processing routine.
219  *    if you want to cancel it, you need to provide some external means
220  *    of doing so.
221  */
222 int workq_remove(workq_t *wq, workq_ele_t *work_item)
223 {
224    int stat, found = 0;
225    pthread_t id;
226    workq_ele_t *item, *prev;
227
228    Dmsg0(400, "workq_remove\n");
229    if (wq->valid != WORKQ_VALID) {
230       return EINVAL;
231    }
232
233    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
234       return stat;
235    }
236
237    for (prev=item=wq->first; item; item=item->next) {
238       if (item == work_item) {
239          found = 1;
240          break;
241       }
242       prev = item;
243    }
244    if (!found) {
245       return EINVAL;
246    }
247
248    /* Move item to be first on list */
249    if (wq->first != work_item) {
250       prev->next = work_item->next;
251       if (wq->last == work_item) {
252          wq->last = prev;
253       }
254       work_item->next = wq->first;
255       wq->first = work_item;
256    }
257
258    /* if any threads are idle, wake one */
259    if (wq->idle_workers > 0) {
260       Dmsg0(400, "Signal worker\n");
261       if ((stat = pthread_cond_signal(&wq->work)) != 0) {
262          pthread_mutex_unlock(&wq->mutex);
263          return stat;
264       }
265    } else {
266       Dmsg0(400, "Create worker thread\n");
267       /* No idle threads so create a new one */
268       set_thread_concurrency(wq->max_workers + 1);
269       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
270          pthread_mutex_unlock(&wq->mutex);
271          return stat;
272       }
273       wq->num_workers++;
274    }
275    pthread_mutex_unlock(&wq->mutex);
276    Dmsg0(400, "Return workq_remove\n");
277    return stat;
278 }
279
280
281 /*
282  * This is the worker thread that serves the work queue.
283  * In due course, it will call the user's engine.
284  */
285 extern "C"
286 void *workq_server(void *arg)
287 {
288    struct timespec timeout;
289    workq_t *wq = (workq_t *)arg;
290    workq_ele_t *we;
291    int stat, timedout;
292
293    Dmsg0(400, "Start workq_server\n");
294    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
295       return NULL;
296    }
297
298    for (;;) {
299       struct timeval tv;
300       struct timezone tz;
301
302       Dmsg0(400, "Top of for loop\n");
303       timedout = 0;
304       Dmsg0(400, "gettimeofday()\n");
305       gettimeofday(&tv, &tz);
306       timeout.tv_nsec = 0;
307       timeout.tv_sec = tv.tv_sec + 2;
308
309       while (wq->first == NULL && !wq->quit) {
310          /*
311           * Wait 2 seconds, then if no more work, exit
312           */
313          Dmsg0(400, "pthread_cond_timedwait()\n");
314 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
315          /* CYGWIN dies with a page fault the second
316           * time that pthread_cond_timedwait() is called
317           * so fake it out.
318           */
319          pthread_mutex_lock(&wq->mutex);
320          stat = ETIMEDOUT;
321 #else
322          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
323 #endif
324          Dmsg1(400, "timedwait=%d\n", stat);
325          if (stat == ETIMEDOUT) {
326             timedout = 1;
327             break;
328          } else if (stat != 0) {
329             /* This shouldn't happen */
330             Dmsg0(400, "This shouldn't happen\n");
331             wq->num_workers--;
332             pthread_mutex_unlock(&wq->mutex);
333             return NULL;
334          }
335       }
336       we = wq->first;
337       if (we != NULL) {
338          wq->first = we->next;
339          if (wq->last == we) {
340             wq->last = NULL;
341          }
342          if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
343             return NULL;
344          }
345          /* Call user's routine here */
346          Dmsg0(400, "Calling user engine.\n");
347          wq->engine(we->data);
348          Dmsg0(400, "Back from user engine.\n");
349          free(we);                    /* release work entry */
350          Dmsg0(400, "relock mutex\n");
351          if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
352             return NULL;
353          }
354          Dmsg0(400, "Done lock mutex\n");
355       }
356       /*
357        * If no more work request, and we are asked to quit, then do it
358        */
359       if (wq->first == NULL && wq->quit) {
360          wq->num_workers--;
361          if (wq->num_workers == 0) {
362             Dmsg0(400, "Wake up destroy routine\n");
363             /* Wake up destroy routine if he is waiting */
364             pthread_cond_broadcast(&wq->work);
365          }
366          Dmsg0(400, "Unlock mutex\n");
367          pthread_mutex_unlock(&wq->mutex);
368          Dmsg0(400, "Return from workq_server\n");
369          return NULL;
370       }
371       Dmsg0(400, "Check for work request\n");
372       /*
373        * If no more work requests, and we waited long enough, quit
374        */
375       Dmsg1(400, "wq->first==NULL = %d\n", wq->first==NULL);
376       Dmsg1(400, "timedout=%d\n", timedout);
377       if (wq->first == NULL && timedout) {
378          Dmsg0(400, "break big loop\n");
379          wq->num_workers--;
380          break;
381       }
382       Dmsg0(400, "Loop again\n");
383    } /* end of big for loop */
384
385    Dmsg0(400, "unlock mutex\n");
386    pthread_mutex_unlock(&wq->mutex);
387    Dmsg0(400, "End workq_server\n");
388    return NULL;
389 }