2 * Bacula work queue routines. Permits passing work to
5 * Kern Sibbald, January MMI
9 * This code adapted from "Programming with POSIX Threads", by
14 * static workq_t job_wq; define work queue
17 * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
18 * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
21 * Add an item to the queue
22 * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
23 * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno));
27 * workq_destroy(workq_t *wq);
31 Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
33 This program is free software; you can redistribute it and/or
34 modify it under the terms of the GNU General Public License as
35 published by the Free Software Foundation; either version 2 of
36 the License, or (at your option) any later version.
38 This program is distributed in the hope that it will be useful,
39 but WITHOUT ANY WARRANTY; without even the implied warranty of
40 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
41 General Public License for more details.
43 You should have received a copy of the GNU General Public
44 License along with this program; if not, write to the Free
45 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
52 /* Forward referenced functions */
53 static void *workq_server(void *arg);
56 * Initialize a work queue
58 * Returns: 0 on success
61 int workq_init(workq_t *wq, int threads, void (*engine)(void *arg))
65 if ((stat = pthread_attr_init(&wq->attr)) != 0) {
68 if ((stat = pthread_attr_setdetachstate(&wq->attr, PTHREAD_CREATE_DETACHED)) != 0) {
69 pthread_attr_destroy(&wq->attr);
72 if ((stat = pthread_mutex_init(&wq->mutex, NULL)) != 0) {
73 pthread_attr_destroy(&wq->attr);
76 if ((stat = pthread_cond_init(&wq->work, NULL)) != 0) {
77 pthread_mutex_destroy(&wq->mutex);
78 pthread_attr_destroy(&wq->attr);
82 wq->first = wq->last = NULL;
83 wq->max_workers = threads; /* max threads to create */
84 wq->num_workers = 0; /* no threads yet */
85 wq->idle_workers = 0; /* no idle threads */
86 wq->engine = engine; /* routine to run */
87 wq->valid = WORKQ_VALID;
92 * Destroy a work queue
94 * Returns: 0 on success
97 int workq_destroy(workq_t *wq)
99 int stat, stat1, stat2;
101 if (wq->valid != WORKQ_VALID) {
104 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
107 wq->valid = 0; /* prevent any more operations */
110 * If any threads are active, wake them
112 if (wq->num_workers > 0) {
114 if (wq->idle_workers) {
115 if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
116 pthread_mutex_unlock(&wq->mutex);
120 while (wq->num_workers > 0) {
121 if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
122 pthread_mutex_unlock(&wq->mutex);
127 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
130 stat = pthread_mutex_destroy(&wq->mutex);
131 stat1 = pthread_cond_destroy(&wq->work);
132 stat2 = pthread_attr_destroy(&wq->attr);
133 return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2));
138 * Add work to a queue
140 int workq_add(workq_t *wq, void *element)
146 Dmsg0(200, "workq_add\n");
147 if (wq->valid != WORKQ_VALID) {
151 if ((item = (workq_ele_t *) malloc(sizeof(workq_ele_t))) == NULL) {
154 item->data = element;
156 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
161 Dmsg0(200, "add item to queue\n");
162 /* Add the new item to the end of the queue */
163 if (wq->first == NULL) {
166 wq->last->next = item;
170 /* if any threads are idle, wake one */
171 if (wq->idle_workers > 0) {
172 Dmsg0(200, "Signal worker\n");
173 if ((stat = pthread_cond_signal(&wq->work)) != 0) {
174 pthread_mutex_unlock(&wq->mutex);
177 } else if (wq->num_workers < wq->max_workers) {
178 Dmsg0(200, "Create worker thread\n");
179 /* No idle threads so create a new one */
180 set_thread_concurrency(wq->max_workers + 1);
181 if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
182 pthread_mutex_unlock(&wq->mutex);
187 pthread_mutex_unlock(&wq->mutex);
188 Dmsg0(200, "Return workq_add\n");
193 * This is the worker thread that serves the work queue.
194 * In due course, it will call the user's engine.
196 static void *workq_server(void *arg)
198 struct timespec timeout;
199 workq_t *wq = (workq_t *)arg;
203 Dmsg0(200, "Start workq_server\n");
204 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
212 Dmsg0(200, "Top of for loop\n");
214 Dmsg0(200, "gettimeofday()\n");
215 gettimeofday(&tv, &tz);
217 timeout.tv_sec = tv.tv_sec + 2;
219 while (wq->first == NULL && !wq->quit) {
221 * Wait 2 seconds, then if no more work, exit
223 Dmsg0(200, "pthread_cond_timedwait()\n");
224 #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN
225 /* CYGWIN dies with a page fault the second
226 * time that pthread_cond_timedwait() is called
229 pthread_mutex_lock(&wq->mutex);
232 stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
234 Dmsg1(200, "timedwait=%d\n", stat);
235 if (stat == ETIMEDOUT) {
238 } else if (stat != 0) {
239 /* This shouldn't happen */
240 Dmsg0(200, "This shouldn't happen\n");
242 pthread_mutex_unlock(&wq->mutex);
248 wq->first = we->next;
249 if (wq->last == we) {
252 if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
255 /* Call user's routine here */
256 Dmsg0(200, "Calling user engine.\n");
257 wq->engine(we->data);
258 Dmsg0(200, "Back from user engine.\n");
259 free(we); /* release work entry */
260 Dmsg0(200, "relock mutex\n");
261 if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
264 Dmsg0(200, "Done lock mutex\n");
267 * If no more work request, and we are asked to quit, then do it
269 if (wq->first == NULL && wq->quit) {
271 if (wq->num_workers == 0) {
272 Dmsg0(200, "Wake up destroy routine\n");
273 /* Wake up destroy routine if he is waiting */
274 pthread_cond_broadcast(&wq->work);
276 Dmsg0(200, "Unlock mutex\n");
277 pthread_mutex_unlock(&wq->mutex);
278 Dmsg0(200, "Return from workq_server\n");
281 Dmsg0(200, "Check for work request\n");
283 * If no more work requests, and we waited long enough, quit
285 Dmsg1(200, "wq->first==NULL = %d\n", wq->first==NULL);
286 Dmsg1(200, "timedout=%d\n", timedout);
287 if (wq->first == NULL && timedout) {
288 Dmsg0(200, "break big loop\n");
292 Dmsg0(200, "Loop again\n");
293 } /* end of big for loop */
295 Dmsg0(200, "unlock mutex\n");
296 pthread_mutex_unlock(&wq->mutex);
297 Dmsg0(200, "End workq_server\n");