X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Flib%2Fworkq.c;h=dbe0001ffff78551386943c16b80cbad422a56da;hb=ed89319bc77d781ff5bebba3fd48593aab15ef53;hp=3ccc663923a54d565280251d839472df2dffb25e;hpb=82151d934f8b46ff7133b084808f288c81edd372;p=bacula%2Fbacula diff --git a/bacula/src/lib/workq.c b/bacula/src/lib/workq.c index 3ccc663923..dbe0001fff 100755 --- a/bacula/src/lib/workq.c +++ b/bacula/src/lib/workq.c @@ -56,12 +56,12 @@ static void *workq_server(void *arg); * Initialize a work queue * * Returns: 0 on success - * errno on failure + * errno on failure */ -int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) +int workq_init(workq_t *wq, int threads, void *(*engine)(void *arg)) { int stat; - + if ((stat = pthread_attr_init(&wq->attr)) != 0) { return stat; } @@ -80,10 +80,10 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) } wq->quit = 0; wq->first = wq->last = NULL; - wq->max_workers = threads; /* max threads to create */ - wq->num_workers = 0; /* no threads yet */ - wq->idle_workers = 0; /* no idle threads */ - wq->engine = engine; /* routine to run */ + wq->max_workers = threads; /* max threads to create */ + wq->num_workers = 0; /* no threads yet */ + wq->idle_workers = 0; /* no idle threads */ + wq->engine = engine; /* routine to run */ wq->valid = WORKQ_VALID; return 0; } @@ -92,7 +92,7 @@ int workq_init(workq_t *wq, int threads, void (*engine)(void *arg)) * Destroy a work queue * * Returns: 0 on success - * errno on failure + * errno on failure */ int workq_destroy(workq_t *wq) { @@ -104,7 +104,7 @@ int workq_destroy(workq_t *wq) if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { return stat; } - wq->valid = 0; /* prevent any more operations */ + wq->valid = 0; /* prevent any more operations */ /* * If any threads are active, wake them @@ -112,22 +112,22 @@ int workq_destroy(workq_t *wq) if (wq->num_workers > 0) { wq->quit = 1; if (wq->idle_workers) { - if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; - } + if ((stat = pthread_cond_broadcast(&wq->work)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } } while (wq->num_workers > 0) { - if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; - } + if ((stat = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) { + pthread_mutex_unlock(&wq->mutex); + return stat; + } } } if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { return stat; } - stat = pthread_mutex_destroy(&wq->mutex); + stat = pthread_mutex_destroy(&wq->mutex); stat1 = pthread_cond_destroy(&wq->work); stat2 = pthread_attr_destroy(&wq->attr); return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2)); @@ -138,10 +138,10 @@ int workq_destroy(workq_t *wq) * Add work to a queue * wq is a queue that was created with workq_init * element is a user unique item that will be passed to the - * processing routine + * processing routine * work_item will get internal work queue item -- if it is not NULL * priority if non-zero will cause the item to be placed on the - * head of the list instead of the tail. + * head of the list instead of the tail. */ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) { @@ -168,18 +168,18 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) if (priority) { /* Add to head of queue */ if (wq->first == NULL) { - wq->first = item; - wq->last = item; + wq->first = item; + wq->last = item; } else { - item->next = wq->first; - wq->first = item; + item->next = wq->first; + wq->first = item; } } else { /* Add to end of queue */ if (wq->first == NULL) { - wq->first = item; + wq->first = item; } else { - wq->last->next = item; + wq->last->next = item; } wq->last = item; } @@ -188,16 +188,16 @@ int workq_add(workq_t *wq, void *element, workq_ele_t **work_item, int priority) if (wq->idle_workers > 0) { Dmsg0(200, "Signal worker\n"); if ((stat = pthread_cond_signal(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } } else if (wq->num_workers < wq->max_workers) { Dmsg0(200, "Create worker thread\n"); /* No idle threads so create a new one */ set_thread_concurrency(wq->max_workers + 1); if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } wq->num_workers++; } @@ -236,8 +236,8 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) for (prev=item=wq->first; item; item=item->next) { if (item == work_item) { - found = 1; - break; + found = 1; + break; } prev = item; } @@ -249,7 +249,7 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) if (wq->first != work_item) { prev->next = work_item->next; if (wq->last == work_item) { - wq->last = prev; + wq->last = prev; } work_item->next = wq->first; wq->first = work_item; @@ -259,16 +259,16 @@ int workq_remove(workq_t *wq, workq_ele_t *work_item) if (wq->idle_workers > 0) { Dmsg0(200, "Signal worker\n"); if ((stat = pthread_cond_signal(&wq->work)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } } else { Dmsg0(200, "Create worker thread\n"); /* No idle threads so create a new one */ set_thread_concurrency(wq->max_workers + 1); if ((stat = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) { - pthread_mutex_unlock(&wq->mutex); - return stat; + pthread_mutex_unlock(&wq->mutex); + return stat; } wq->num_workers++; } @@ -306,66 +306,66 @@ static void *workq_server(void *arg) timeout.tv_sec = tv.tv_sec + 2; while (wq->first == NULL && !wq->quit) { - /* - * Wait 2 seconds, then if no more work, exit - */ + /* + * Wait 2 seconds, then if no more work, exit + */ Dmsg0(200, "pthread_cond_timedwait()\n"); #ifdef xxxxxxxxxxxxxxxx_was_HAVE_CYGWIN - /* CYGWIN dies with a page fault the second - * time that pthread_cond_timedwait() is called - * so fake it out. - */ - pthread_mutex_lock(&wq->mutex); - stat = ETIMEDOUT; + /* CYGWIN dies with a page fault the second + * time that pthread_cond_timedwait() is called + * so fake it out. + */ + pthread_mutex_lock(&wq->mutex); + stat = ETIMEDOUT; #else - stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout); + stat = pthread_cond_timedwait(&wq->work, &wq->mutex, &timeout); #endif Dmsg1(200, "timedwait=%d\n", stat); - if (stat == ETIMEDOUT) { - timedout = 1; - break; - } else if (stat != 0) { + if (stat == ETIMEDOUT) { + timedout = 1; + break; + } else if (stat != 0) { /* This shouldn't happen */ Dmsg0(200, "This shouldn't happen\n"); - wq->num_workers--; - pthread_mutex_unlock(&wq->mutex); - return NULL; - } + wq->num_workers--; + pthread_mutex_unlock(&wq->mutex); + return NULL; + } } we = wq->first; if (we != NULL) { - wq->first = we->next; - if (wq->last == we) { - wq->last = NULL; - } - if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { - return NULL; - } + wq->first = we->next; + if (wq->last == we) { + wq->last = NULL; + } + if ((stat = pthread_mutex_unlock(&wq->mutex)) != 0) { + return NULL; + } /* Call user's routine here */ Dmsg0(200, "Calling user engine.\n"); - wq->engine(we->data); + wq->engine(we->data); Dmsg0(200, "Back from user engine.\n"); - free(we); /* release work entry */ + free(we); /* release work entry */ Dmsg0(200, "relock mutex\n"); - if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { - return NULL; - } + if ((stat = pthread_mutex_lock(&wq->mutex)) != 0) { + return NULL; + } Dmsg0(200, "Done lock mutex\n"); } /* * If no more work request, and we are asked to quit, then do it */ if (wq->first == NULL && wq->quit) { - wq->num_workers--; - if (wq->num_workers == 0) { + wq->num_workers--; + if (wq->num_workers == 0) { Dmsg0(200, "Wake up destroy routine\n"); - /* Wake up destroy routine if he is waiting */ - pthread_cond_broadcast(&wq->work); - } + /* Wake up destroy routine if he is waiting */ + pthread_cond_broadcast(&wq->work); + } Dmsg0(200, "Unlock mutex\n"); - pthread_mutex_unlock(&wq->mutex); + pthread_mutex_unlock(&wq->mutex); Dmsg0(200, "Return from workq_server\n"); - return NULL; + return NULL; } Dmsg0(200, "Check for work request\n"); /* @@ -375,8 +375,8 @@ static void *workq_server(void *arg) Dmsg1(200, "timedout=%d\n", timedout); if (wq->first == NULL && timedout) { Dmsg0(200, "break big loop\n"); - wq->num_workers--; - break; + wq->num_workers--; + break; } Dmsg0(200, "Loop again\n"); } /* end of big for loop */