X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Flib%2Fworkq.c;h=2744d23c92e7f25a4f7ee1b7bd3f2b853c93e4a4;hb=a84ce1aa4c55f255e02a1c5f76c1e0284459c3b1;hp=4611e6ce541de672eef30fa63ccafa2a589c5230;hpb=f4fadeaf83a912e0a19c19eacd0c7113b21f0a67;p=bacula%2Fbacula diff --git a/bacula/src/lib/workq.c b/bacula/src/lib/workq.c old mode 100755 new mode 100644 index 4611e6ce54..2744d23c92 --- a/bacula/src/lib/workq.c +++ b/bacula/src/lib/workq.c @@ -1,3 +1,30 @@ +/* + Bacula® - The Network Backup Solution + + Copyright (C) 2001-2008 Free Software Foundation Europe e.V. + + The main author of Bacula is Kern Sibbald, with contributions from + many others, a complete list can be found in the file AUTHORS. + This program is Free Software; you can redistribute it and/or + modify it under the terms of version two of the GNU General Public + License as published by the Free Software Foundation and included + in the file LICENSE. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. + + Bacula® is a registered trademark of Kern Sibbald. + The licensor of Bacula is the Free Software Foundation Europe + (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, + Switzerland, email:ftf@fsfeurope.org. +*/ /* * Bacula work queue routines. Permits passing work to * multiple threads. @@ -15,37 +42,20 @@ * * Initialize queue * if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) { - * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", strerror(errno)); + * berrno be; + * Emsg1(M_ABORT, 0, "Could not init job work queue: ERR=%s\n", be.bstrerror(errno)); * } * * Add an item to the queue * if ((stat = workq_add(&job_wq, (void *)jcr)) != 0) { - * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", strerror(errno)); + * berrno be; + * Emsg1(M_ABORT, 0, "Could not add job to work queue: ERR=%s\n", be.bstrerror(errno)); * } * * Terminate the queue * workq_destroy(workq_t *wq); * */ -/* - Copyright (C) 2000-2004 Kern Sibbald and John Walker - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of - the License, or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public - License along with this program; if not, write to the Free - Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, - MA 02111-1307, USA. - - */ #include "bacula.h" @@ -56,7 +66,7 @@ extern "C" void *workq_server(void *arg); * Initialize a work queue * * Returns: 0 on success - * errno on failure + * errno on failure */ int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg)) { @@ -80,10 +90,10 @@ int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg)) } wq->quit = 0; wq->first = wq->last = NULL; - wq->max_workers = threads; /* max threads to create */ - wq->num_workers = 0; /* no threads yet */ - wq->idle_workers = 0; /* no idle threads */ - wq->engine = engine; /* routine to run */ + wq->max_workers = threads; /* max threads to create */ + wq->num_workers = 0; /* no threads yet */ + wq->idle_workers = 0; /* no idle threads */ + wq->engine = engine; /* routine to run */ wq->valid = WORKQ_VALID; return 0; } @@ -92,7 +102,7 @@ int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg)) * Destroy a work queue * * Returns: 0 on success - * errno on failure + * errno on failure */ int workq_destroy(workq_t *wq) { @@ -104,7 +114,7 @@ int workq_destroy(workq_t *wq) if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { return stat; } - wq->valid = 0; /* prevent any more operations */ + wq->valid = 0; /* prevent any more operations */ /* * If any threads are active, wake them @@ -112,22 +122,22 @@ int workq_destroy(workq_t *wq) if (wq->num_workers > 0) { wq->quit = 1; if (wq->idle_workers) { - if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; - } + if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } } while (wq->num_workers > 0) { - if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; - } + if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } } } if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { return stat; } - stat = pthread_mutex_destroy(&wq->mutex); + stat = pthread_mutex_destroy(&wq->mutex); stat1 = pthread_cond_destroy(&wq->work); stat2 = pthread_attr_destroy(&wq->attr); return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2)); @@ -138,10 +148,10 @@ int workq_destroy(workq_t *wq) * Add work to a queue * wq is a queue that was created with workq_init * element is a user unique item that will be passed to the - * processing routine + * processing routine * work_item will get internal work queue item -- if it is not NULL * priority if non-zero will cause the item to be placed on the - * head of the list instead of the tail. + * head of the list instead of the tail. */ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) { @@ -149,7 +159,7 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) workq_ele_t *item; pthread_t id; - Dmsg0(400, "workq_add\n"); + Dmsg0(1400, "workq_add\n"); if (wq->valid != WORKQ_VALID) { return EINVAL; } @@ -164,45 +174,45 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) return stat; } - Dmsg0(400, "add item to queue\n"); + Dmsg0(1400, "add item to queue\n"); if (priority) { /* Add to head of queue */ if (wq->first == NULL) { - wq->first = item; - wq->last = item; + wq->first = item; + wq->last = item; } else { - item->next = wq->first; - wq->first = item; + item->next = wq->first; + wq->first = item; } } else { /* Add to end of queue */ if (wq->first == NULL) { - wq->first = item; + wq->first = item; } else { - wq->last->next = item; + wq->last->next = item; } wq->last = item; } /* if any threads are idle, wake one */ if (wq->idle_workers > 0) { - Dmsg0(400, "Signal worker\n"); - if ((stat = pthread_cond_signal(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + Dmsg0(1400, "Signal worker\n"); + if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; } } else if (wq->num_workers < wq->max_workers) { - Dmsg0(400, "Create worker thread\n"); + Dmsg0(1400, "Create worker thread\n"); /* No idle threads so create a new one */ set_thread_concurrency(wq->max_workers + 1); if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } wq->num_workers++; } pthread_mutex_unlock(&wq->mutex); - Dmsg0(400, "Return workq_add\n"); + Dmsg0(1400, "Return workq_add\n"); /* Return work_item if requested */ if (work_item) { *work_item = item; @@ -225,7 +235,7 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) pthread_t id; workq_ele_t *item, *prev; - Dmsg0(400, "workq_remove\n"); + Dmsg0(1400, "workq_remove\n"); if (wq->valid != WORKQ_VALID) { return EINVAL; } @@ -236,8 +246,8 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) for (prev=item=wq->first; item; item=item->next) { if (item == work_item) { - found = 1; - break; + found = 1; + break; } prev = item; } @@ -249,7 +259,7 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) if (wq->first != work_item) { prev->next = work_item->next; if (wq->last == work_item) { - wq->last = prev; + wq->last = prev; } work_item->next = wq->first; wq->first = work_item; @@ -257,23 +267,23 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) /* if any threads are idle, wake one */ if (wq->idle_workers > 0) { - Dmsg0(400, "Signal worker\n"); - if ((stat = pthread_cond_signal(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + Dmsg0(1400, "Signal worker\n"); + if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; } } else { - Dmsg0(400, "Create worker thread\n"); + Dmsg0(1400, "Create worker thread\n"); /* No idle threads so create a new one */ set_thread_concurrency(wq->max_workers + 1); if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } wq->num_workers++; } pthread_mutex_unlock(&wq->mutex); - Dmsg0(400, "Return workq_remove\n"); + Dmsg0(1400, "Return workq_remove\n"); return stat; } @@ -290,7 +300,7 @@ void *workq_server(void *arg) workq_ele_t *we; int stat, timedout; - Dmsg0(400, "Start workq_server\n"); + Dmsg0(1400, "Start workq_server\n"); if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { return NULL; } @@ -299,91 +309,91 @@ void *workq_server(void *arg) struct timeval tv; struct timezone tz; - Dmsg0(400, "Top of for loop\n"); + Dmsg0(1400, "Top of for loop\n"); timedout = 0; - Dmsg0(400, "gettimeofday()\n"); + Dmsg0(1400, "gettimeofday()\n"); gettimeofday(&tv, &tz); timeout.tv_nsec = 0; timeout.tv_sec = tv.tv_sec + 2; while (wq->first == NULL && !wq->quit) { - /* - * Wait 2 seconds, then if no more work, exit - */ - Dmsg0(400, "pthread_cond_timedwait()\n"); + /* + * Wait 2 seconds, then if no more work, exit + */ + Dmsg0(1400, "pthread_cond_timedwait()\n"); #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN - /* CYGWIN dies with a page fault the second - * time that pthread_cond_timedwait() is called - * so fake it out. - */ - pthread_mutex_lock(&wq->mutex); - stat = ETIMEDOUT; + /* CYGWIN dies with a page fault the second + * time that pthread_cond_timedwait() is called + * so fake it out. + */ + pthread_mutex_lock(&wq->mutex); + stat = ETIMEDOUT; #else - stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout); + stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout); #endif - Dmsg1(400, "timedwait=%d\n", stat); - if (stat == ETIMEDOUT) { - timedout = 1; - break; - } else if (stat != 0) { - /* This shouldn't happen */ - Dmsg0(400, "This shouldn't happen\n"); - wq->num_workers--; - pthread_mutex_unlock(&wq->mutex); - return NULL; - } + Dmsg1(1400, "timedwait=%d\n", stat); + if (stat == ETIMEDOUT) { + timedout = 1; + break; + } else if (stat != 0) { + /* This shouldn't happen */ + Dmsg0(1400, "This shouldn't happen\n"); + wq->num_workers--; + pthread_mutex_unlock(&wq->mutex); + return NULL; + } } we = wq->first; if (we != NULL) { - wq->first = we->next; - if (wq->last == we) { - wq->last = NULL; - } - if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { - return NULL; - } - /* Call user's routine here */ - Dmsg0(400, "Calling user engine.\n"); - wq->engine(we->data); - Dmsg0(400, "Back from user engine.\n"); - free(we); /* release work entry */ - Dmsg0(400, "relock mutex\n"); - if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { - return NULL; - } - Dmsg0(400, "Done lock mutex\n"); + wq->first = we->next; + if (wq->last == we) { + wq->last = NULL; + } + if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { + return NULL; + } + /* Call user's routine here */ + Dmsg0(1400, "Calling user engine.\n"); + wq->engine(we->data); + Dmsg0(1400, "Back from user engine.\n"); + free(we); /* release work entry */ + Dmsg0(1400, "relock mutex\n"); + if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { + return NULL; + } + Dmsg0(1400, "Done lock mutex\n"); } /* * If no more work request, and we are asked to quit, then do it */ if (wq->first == NULL && wq->quit) { - wq->num_workers--; - if (wq->num_workers == 0) { - Dmsg0(400, "Wake up destroy routine\n"); - /* Wake up destroy routine if he is waiting */ - pthread_cond_broadcast(&wq->work); - } - Dmsg0(400, "Unlock mutex\n"); - pthread_mutex_unlock(&wq->mutex); - Dmsg0(400, "Return from workq_server\n"); - return NULL; + wq->num_workers--; + if (wq->num_workers == 0) { + Dmsg0(1400, "Wake up destroy routine\n"); + /* Wake up destroy routine if he is waiting */ + pthread_cond_broadcast(&wq->work); + } + Dmsg0(1400, "Unlock mutex\n"); + pthread_mutex_unlock(&wq->mutex); + Dmsg0(1400, "Return from workq_server\n"); + return NULL; } - Dmsg0(400, "Check for work request\n"); + Dmsg0(1400, "Check for work request\n"); /* * If no more work requests, and we waited long enough, quit */ - Dmsg1(400, "wq->first==NULL = %d\n", wq->first==NULL); - Dmsg1(400, "timedout=%d\n", timedout); + Dmsg1(1400, "wq->first==NULL = %d\n", wq->first==NULL); + Dmsg1(1400, "timedout=%d\n", timedout); if (wq->first == NULL && timedout) { - Dmsg0(400, "break big loop\n"); - wq->num_workers--; - break; + Dmsg0(1400, "break big loop\n"); + wq->num_workers--; + break; } - Dmsg0(400, "Loop again\n"); + Dmsg0(1400, "Loop again\n"); } /* end of big for loop */ - Dmsg0(400, "unlock mutex\n"); + Dmsg0(1400, "unlock mutex\n"); pthread_mutex_unlock(&wq->mutex); - Dmsg0(400, "End workq_server\n"); + Dmsg0(1400, "End workq_server\n"); return NULL; }