]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/workq.c
Update file permission bits
[bacula/bacula] / bacula / src / lib / workq.c
1 /*
2  * Bacula work queue routines. Permits passing work to
3  *  multiple threads.
4  *
5  *  Kern Sibbald, January MMI
6  *
7  *   Version $Id$
8  *
9  *  This code adapted from "Programming with POSIX Threads", by
10  *    David R. Butenhof
11  *
12  * Example:
13  *
14  * static workq_t job_wq;    define work queue
15  *
16  *  Initialize queue
17  *  if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
18  *     Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
19  *   }
20  *
21  *  Add an item to the queue
22  *  if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
23  *      Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno));
24  *   }
25  *
26  *  Terminate the queue
27  *  workq_destroy(workq_t *wq);
28  *
29  */
30 /*
31    Copyright (C) 2000-2003 Kern Sibbald and John Walker
32
33    This program is free software; you can redistribute it and/or
34    modify it under the terms of the GNU General Public License as
35    published by the Free Software Foundation; either version 2 of
36    the License, or (at your option) any later version.
37
38    This program is distributed in the hope that it will be useful,
39    but WITHOUT ANY WARRANTY; without even the implied warranty of
40    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
41    General Public License for more details.
42
43    You should have received a copy of the GNU General Public
44    License along with this program; if not, write to the Free
45    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
46    MA 02111-1307, USA.
47
48  */
49
50 #include "bacula.h"
51
52 /* Forward referenced functions */
53 static void *workq_server(void *arg);
54
55 /*   
56  * Initialize a work queue
57  *
58  *  Returns: 0 on success
59  *           errno on failure
60  */
61 int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
62 {
63    int stat;
64                         
65    if ((stat = pthread_attr_init(&wq->attr)) != 0) {
66       return stat;
67    }
68    if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69       pthread_attr_destroy(&wq->attr);
70       return stat;
71    }
72    if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
73       pthread_attr_destroy(&wq->attr);
74       return stat;
75    }
76    if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
77       pthread_mutex_destroy(&wq->mutex);
78       pthread_attr_destroy(&wq->attr);
79       return stat;
80    }
81    wq->quit = 0;
82    wq->first = wq->last = NULL;
83    wq->max_workers = threads;         /* max threads to create */
84    wq->num_workers = 0;               /* no threads yet */
85    wq->idle_workers = 0;              /* no idle threads */
86    wq->engine = engine;               /* routine to run */
87    wq->valid = WORKQ_VALID; 
88    return 0;
89 }
90
91 /*
92  * Destroy a work queue
93  *
94  * Returns: 0 on success
95  *          errno on failure
96  */
97 int workq_destroy(workq_t *wq)
98 {
99    int stat, stat1, stat2;
100
101   if (wq->valid != WORKQ_VALID) {
102      return EINVAL;
103   }
104   if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
105      return stat;
106   }
107   wq->valid = 0;                      /* prevent any more operations */
108
109   /* 
110    * If any threads are active, wake them 
111    */
112   if (wq->num_workers > 0) {
113      wq->quit = 1;
114      if (wq->idle_workers) {
115         if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
116            pthread_mutex_unlock(&wq->mutex);
117            return stat;
118         }
119      }
120      while (wq->num_workers > 0) {
121         if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
122            pthread_mutex_unlock(&wq->mutex);
123            return stat;
124         }
125      }
126   }
127   if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
128      return stat;
129   }
130   stat  = pthread_mutex_destroy(&wq->mutex);
131   stat1 = pthread_cond_destroy(&wq->work);
132   stat2 = pthread_attr_destroy(&wq->attr);
133   return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
134 }
135
136
137 /*
138  *  Add work to a queue
139  */
140 int workq_add(workq_t *wq, void *element)
141 {
142    int stat;
143    workq_ele_t *item;
144    pthread_t id;
145     
146    Dmsg0(200, "workq_add\n");
147    if (wq->valid != WORKQ_VALID) {
148       return EINVAL;
149    }
150
151    if ((item = (workq_ele_t *) malloc(sizeof(workq_ele_t))) == NULL) {
152       return ENOMEM;
153    }
154    item->data = element;
155    item->next = NULL;
156    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
157       free(item);
158       return stat;
159    }
160
161    Dmsg0(200, "add item to queue\n");
162    /* Add the new item to the end of the queue */
163    if (wq->first == NULL) {
164       wq->first = item;
165    } else {
166       wq->last->next = item;
167    }
168    wq->last = item;
169
170    /* if any threads are idle, wake one */
171    if (wq->idle_workers > 0) {
172       Dmsg0(200, "Signal worker\n");
173       if ((stat = pthread_cond_signal(&wq->work)) != 0) {
174          pthread_mutex_unlock(&wq->mutex);
175          return stat;
176       }
177    } else if (wq->num_workers < wq->max_workers) {
178       Dmsg0(200, "Create worker thread\n");
179       /* No idle threads so create a new one */
180       set_thread_concurrency(wq->max_workers + 1);
181       if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
182          pthread_mutex_unlock(&wq->mutex);
183          return stat;
184       }
185       wq->num_workers++;
186    }
187    pthread_mutex_unlock(&wq->mutex);
188    Dmsg0(200, "Return workq_add\n");
189    return stat;
190 }
191
192 /* 
193  * This is the worker thread that serves the work queue.
194  * In due course, it will call the user's engine.
195  */
196 static void *workq_server(void *arg)
197 {
198    struct timespec timeout;
199    workq_t *wq = (workq_t *)arg;
200    workq_ele_t *we;
201    int stat, timedout;
202
203    Dmsg0(200, "Start workq_server\n");
204    if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
205       return NULL;
206    }
207
208    for (;;) {
209       struct timeval tv;
210       struct timezone tz;
211
212       Dmsg0(200, "Top of for loop\n");
213       timedout = 0;
214       Dmsg0(200, "gettimeofday()\n");
215       gettimeofday(&tv, &tz);
216       timeout.tv_nsec = 0;
217       timeout.tv_sec = tv.tv_sec + 2;
218
219       while (wq->first == NULL && !wq->quit) {
220          /*
221           * Wait 2 seconds, then if no more work, exit
222           */
223          Dmsg0(200, "pthread_cond_timedwait()\n");
224 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
225          /* CYGWIN dies with a page fault the second
226           * time that pthread_cond_timedwait() is called
227           * so fake it out.
228           */
229          pthread_mutex_lock(&wq->mutex);
230          stat = ETIMEDOUT;
231 #else
232          stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
233 #endif
234          Dmsg1(200, "timedwait=%d\n", stat);
235          if (stat == ETIMEDOUT) {
236             timedout = 1;
237             break;
238          } else if (stat != 0) {
239             /* This shouldn't happen */
240             Dmsg0(200, "This shouldn't happen\n");
241             wq->num_workers--;
242             pthread_mutex_unlock(&wq->mutex);
243             return NULL;
244          }
245       } 
246       we = wq->first;
247       if (we != NULL) {
248          wq->first = we->next;
249          if (wq->last == we) {
250             wq->last = NULL;
251          }
252          if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
253             return NULL;
254          }
255          /* Call user's routine here */
256          Dmsg0(200, "Calling user engine.\n");
257          wq->engine(we->data);
258          Dmsg0(200, "Back from user engine.\n");
259          free(we);                    /* release work entry */
260          Dmsg0(200, "relock mutex\n"); 
261          if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
262             return NULL;
263          }
264          Dmsg0(200, "Done lock mutex\n");
265       }
266       /*
267        * If no more work request, and we are asked to quit, then do it
268        */
269       if (wq->first == NULL && wq->quit) {
270          wq->num_workers--;
271          if (wq->num_workers == 0) {
272             Dmsg0(200, "Wake up destroy routine\n");
273             /* Wake up destroy routine if he is waiting */
274             pthread_cond_broadcast(&wq->work);
275          }
276          Dmsg0(200, "Unlock mutex\n");
277          pthread_mutex_unlock(&wq->mutex);
278          Dmsg0(200, "Return from workq_server\n");
279          return NULL;
280       }
281       Dmsg0(200, "Check for work request\n");
282       /* 
283        * If no more work requests, and we waited long enough, quit
284        */
285       Dmsg1(200, "wq->first==NULL = %d\n", wq->first==NULL);
286       Dmsg1(200, "timedout=%d\n", timedout);
287       if (wq->first == NULL && timedout) {
288          Dmsg0(200, "break big loop\n");
289          wq->num_workers--;
290          break;
291       }
292       Dmsg0(200, "Loop again\n");
293    } /* end of big for loop */
294
295    Dmsg0(200, "unlock mutex\n");
296    pthread_mutex_unlock(&wq->mutex);
297    Dmsg0(200, "End workq_server\n");
298    return NULL;
299 }