+/*
+ Bacula® - The Network Backup Solution
+
+ Copyright (C) 2001-2011 Free Software Foundation Europe e.V.
+
+ The main author of Bacula is Kern Sibbald, with contributions from
+ many others, a complete list can be found in the file AUTHORS.
+ This program is Free Software; you can redistribute it and/or
+ modify it under the terms of version three of the GNU Affero General Public
+ License as published by the Free Software Foundation and included
+ in the file LICENSE.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+
+ Bacula® is a registered trademark of Kern Sibbald.
+ The licensor of Bacula is the Free Software Foundation Europe
+ (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
+ Switzerland, email:ftf@fsfeurope.org.
+*/
/*
* Bacula work queue routines. Permits passing work to
* multiple threads.
*
* Initialize queue
* if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) {
- * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno));
+ * berrno be;
+ * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno));
* }
*
* Add an item to the queue
* if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) {
- * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno));
+ * berrno be;
+ * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno));
* }
*
* Terminate the queue
* workq_destroy(workq_t *wq);
*
*/
-/*
- Bacula® - The Network Backup Solution
-
- Copyright (C) 2000-2006 Free Software Foundation Europe e.V.
-
- The main author of Bacula is Kern Sibbald, with contributions from
- many others, a complete list can be found in the file AUTHORS.
- This program is Free Software; you can redistribute it and/or
- modify it under the terms of version two of the GNU General Public
- License as published by the Free Software Foundation plus additions
- that are listed in the file LICENSE.
-
- This program is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- 02110-1301, USA.
-
- Bacula® is a registered trademark of John Walker.
- The licensor of Bacula is the Free Software Foundation Europe
- (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
- Switzerland, email:ftf@fsfeurope.org.
-*/
#include "bacula.h"
+#include "jcr.h"
/* Forward referenced functions */
extern "C" void *workq_server(void *arg);
if (wq->valid != WORKQ_VALID) {
return EINVAL;
}
- if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
- return stat;
- }
+ P(wq->mutex);
wq->valid = 0; /* prevent any more operations */
/*
wq->quit = 1;
if (wq->idle_workers) {
if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
return stat;
}
}
while (wq->num_workers > 0) {
if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
return stat;
}
}
}
- if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
- return stat;
- }
+ V(wq->mutex);
stat = pthread_mutex_destroy(&wq->mutex);
stat1 = pthread_cond_destroy(&wq->work);
stat2 = pthread_attr_destroy(&wq->attr);
*/
int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
{
- int stat;
+ int stat=0;
workq_ele_t *item;
pthread_t id;
}
item->data = element;
item->next = NULL;
- if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
- free(item);
- return stat;
- }
+ P(wq->mutex);
Dmsg0(1400, "add item to queue\n");
if (priority) {
if (wq->idle_workers > 0) {
Dmsg0(1400, "Signal worker\n");
if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
return stat;
}
} else if (wq->num_workers < wq->max_workers) {
/* No idle threads so create a new one */
set_thread_concurrency(wq->max_workers + 1);
if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
return stat;
}
wq->num_workers++;
}
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
Dmsg0(1400, "Return workq_add\n");
/* Return work_item if requested */
if (work_item) {
return EINVAL;
}
- if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
- return stat;
- }
+ P(wq->mutex);
for (prev=item=wq->first; item; item=item->next) {
if (item == work_item) {
if (wq->idle_workers > 0) {
Dmsg0(1400, "Signal worker\n");
if ((stat = pthread_cond_broadcast(&wq->work)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
return stat;
}
} else {
/* No idle threads so create a new one */
set_thread_concurrency(wq->max_workers + 1);
if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
return stat;
}
wq->num_workers++;
}
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
Dmsg0(1400, "Return workq_remove\n");
return stat;
}
int stat, timedout;
Dmsg0(1400, "Start workq_server\n");
- if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
- return NULL;
- }
+ P(wq->mutex);
+ set_jcr_in_tsd(INVALID_JCR);
for (;;) {
struct timeval tv;
* time that pthread_cond_timedwait() is called
* so fake it out.
*/
- pthread_mutex_lock(&wq->mutex);
+ P(wq->mutex);
stat = ETIMEDOUT;
#else
stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout);
/* This shouldn't happen */
Dmsg0(1400, "This shouldn't happen\n");
wq->num_workers--;
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
return NULL;
}
}
if (wq->last == we) {
wq->last = NULL;
}
- if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) {
- return NULL;
- }
+ V(wq->mutex);
/* Call user's routine here */
Dmsg0(1400, "Calling user engine.\n");
wq->engine(we->data);
Dmsg0(1400, "Back from user engine.\n");
free(we); /* release work entry */
Dmsg0(1400, "relock mutex\n");
- if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) {
- return NULL;
- }
+ P(wq->mutex);
Dmsg0(1400, "Done lock mutex\n");
}
/*
pthread_cond_broadcast(&wq->work);
}
Dmsg0(1400, "Unlock mutex\n");
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
Dmsg0(1400, "Return from workq_server\n");
return NULL;
}
} /* end of big for loop */
Dmsg0(1400, "unlock mutex\n");
- pthread_mutex_unlock(&wq->mutex);
+ V(wq->mutex);
Dmsg0(1400, "End workq_server\n");
return NULL;
}