From c6e0b63823f177d08c5e7cf8afc677da569bd772 Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Sat, 18 Dec 2004 12:45:53 +0000 Subject: [PATCH] - Fix bug 207. jcr use count off by one when manually scheduling jobs. git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@1765 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/src/dird/job.c | 2 +- bacula/src/dird/jobq.c | 130 +++++++++++++++++++++++------------------ 2 files changed, 75 insertions(+), 57 deletions(-) diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index 57d5413876..9fe1ed1ce8 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -7,7 +7,7 @@ * Version $Id$ */ /* - Copyright (C) 2000-2004 Kern Sibbald and John Walker + Copyright (C) 2000-2004 Kern Sibbald This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c index dd92d6ff31..1cde7a505c 100755 --- a/bacula/src/dird/jobq.c +++ b/bacula/src/dird/jobq.c @@ -18,7 +18,7 @@ * */ /* - Copyright (C) 2003-2004 Kern Sibbald and John Walker + Copyright (C) 2003-2004 Kern Sibbald This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -62,7 +62,8 @@ int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg)) jobq_item_t *item = NULL; if ((stat = pthread_attr_init(&jq->attr)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_attr_init: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_attr_init: ERR=%s\n", be.strerror(stat)); return stat; } if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) { @@ -70,12 +71,14 @@ int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg)) return stat; } if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_init: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_init: ERR=%s\n", be.strerror(stat)); pthread_attr_destroy(&jq->attr); return stat; } if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_cond_init: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_cond_init: ERR=%s\n", be.strerror(stat)); pthread_mutex_destroy(&jq->mutex); pthread_attr_destroy(&jq->attr); return stat; @@ -103,46 +106,50 @@ int jobq_destroy(jobq_t *jq) { int stat, stat1, stat2; - if (jq->valid != JOBQ_VALID) { - return EINVAL; - } - if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat)); - return stat; - } - jq->valid = 0; /* prevent any more operations */ - - /* - * If any threads are active, wake them - */ - if (jq->num_workers > 0) { - jq->quit = true; - if (jq->idle_workers) { - if ((stat = pthread_cond_broadcast(&jq->work)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_cond_broadcast: ERR=%s\n", strerror(stat)); - pthread_mutex_unlock(&jq->mutex); - return stat; - } - } - while (jq->num_workers > 0) { - if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_cond_wait: ERR=%s\n", strerror(stat)); - pthread_mutex_unlock(&jq->mutex); - return stat; - } - } - } - if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", strerror(stat)); - return stat; - } - stat = pthread_mutex_destroy(&jq->mutex); - stat1 = pthread_cond_destroy(&jq->work); - stat2 = pthread_attr_destroy(&jq->attr); - delete jq->waiting_jobs; - delete jq->running_jobs; - delete jq->ready_jobs; - return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2)); + if (jq->valid != JOBQ_VALID) { + return EINVAL; + } + if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", be.strerror(stat)); + return stat; + } + jq->valid = 0; /* prevent any more operations */ + + /* + * If any threads are active, wake them + */ + if (jq->num_workers > 0) { + jq->quit = true; + if (jq->idle_workers) { + if ((stat = pthread_cond_broadcast(&jq->work)) != 0) { + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_cond_broadcast: ERR=%s\n", be.strerror(stat)); + pthread_mutex_unlock(&jq->mutex); + return stat; + } + } + while (jq->num_workers > 0) { + if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) { + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_cond_wait: ERR=%s\n", be.strerror(stat)); + pthread_mutex_unlock(&jq->mutex); + return stat; + } + } + } + if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", be.strerror(stat)); + return stat; + } + stat = pthread_mutex_destroy(&jq->mutex); + stat1 = pthread_cond_destroy(&jq->work); + stat2 = pthread_attr_destroy(&jq->attr); + delete jq->waiting_jobs; + delete jq->running_jobs; + delete jq->ready_jobs; + return (stat != 0 ? stat : (stat1 != 0 ? stat1 : stat2)); } struct wait_pkt { @@ -221,16 +228,18 @@ int jobq_add(jobq_t *jq, JCR *jcr) sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt)); sched_pkt->jcr = jcr; sched_pkt->jq = jq; - jcr->use_count--; /* release our use of jcr */ +// jcr->use_count--; /* release our use of jcr */ stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt); if (stat != 0) { /* thread not created */ - Jmsg1(jcr, M_ERROR, 0, "pthread_thread_create: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(jcr, M_ERROR, 0, "pthread_thread_create: ERR=%s\n", be.strerror(stat)); } return stat; } if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { - Jmsg1(jcr, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(jcr, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", be.strerror(stat)); jcr->use_count--; /* release jcr */ return stat; } @@ -294,7 +303,8 @@ int jobq_remove(jobq_t *jq, JCR *jcr) } if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", be.strerror(stat)); return stat; } @@ -335,7 +345,8 @@ static int start_server(jobq_t *jq) if (jq->idle_workers > 0) { Dmsg0(300, "Signal worker to wake up\n"); if ((stat = pthread_cond_signal(&jq->work)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_cond_signal: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_cond_signal: ERR=%s\n", be.strerror(stat)); return stat; } } else if (jq->num_workers < jq->max_workers) { @@ -343,7 +354,8 @@ static int start_server(jobq_t *jq) /* No idle threads so create a new one */ set_thread_concurrency(jq->max_workers + 1); if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_create: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_create: ERR=%s\n", be.strerror(stat)); return stat; } } @@ -368,7 +380,8 @@ void *jobq_server(void *arg) Dmsg0(300, "Start jobq_server\n"); if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", be.strerror(stat)); return NULL; } jq->num_workers++; @@ -425,7 +438,8 @@ void *jobq_server(void *arg) /* Release job queue lock */ if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", be.strerror(stat)); jq->num_workers--; return NULL; } @@ -438,7 +452,8 @@ void *jobq_server(void *arg) /* Reacquire job queue lock */ if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", be.strerror(stat)); jq->num_workers--; free(je); /* release job entry */ return NULL; @@ -671,13 +686,15 @@ void *jobq_server(void *arg) * terminated can give us the resource. */ if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", be.strerror(stat)); jq->num_workers--; return NULL; } bmicrosleep(2, 0); /* pause for 2 seconds */ if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_lock: ERR=%s\n", be.strerror(stat)); jq->num_workers--; return NULL; } @@ -689,7 +706,8 @@ void *jobq_server(void *arg) Dmsg0(200, "unlock mutex\n"); if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { - Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", strerror(stat)); + berrno be; + Jmsg1(NULL, M_ERROR, 0, "pthread_mutex_unlock: ERR=%s\n", be.strerror(stat)); } Dmsg0(300, "End jobq_server\n"); return NULL; -- 2.39.2