/*
Bacula® - The Network Backup Solution
- Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
+ Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
The main author of Bacula is Kern Sibbald, with contributions from
many others, a complete list can be found in the file AUTHORS.
This program is Free Software; you can redistribute it and/or
- modify it under the terms of version two of the GNU General Public
+ modify it under the terms of version three of the GNU Affero General Public
License as published by the Free Software Foundation and included
in the file LICENSE.
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
- You should have received a copy of the GNU General Public License
+ You should have received a copy of the GNU Affero General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
*
* Kern Sibbald, July MMIII
*
- * Version $Id$
*
* This code was adapted from the Bacula workq, which was
* adapted from "Programming with POSIX Threads", by
static int start_server(jobq_t *jq);
static bool acquire_resources(JCR *jcr);
static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
-static void dec_read_store(JCR *jcr);
static void dec_write_store(JCR *jcr);
/*
if (jq->valid != JOBQ_VALID) {
return EINVAL;
}
- if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
- berrno be;
- Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
- return stat;
- }
+ P(jq->mutex);
jq->valid = 0; /* prevent any more operations */
/*
if ((stat = pthread_cond_broadcast(&jq->work)) != 0) {
berrno be;
Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat));
- pthread_mutex_unlock(&jq->mutex);
+ V(jq->mutex);
return stat;
}
}
if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
berrno be;
Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat));
- pthread_mutex_unlock(&jq->mutex);
+ V(jq->mutex);
return stat;
}
}
}
- if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
- berrno be;
- Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.bstrerror(stat));
- return stat;
- }
+ V(jq->mutex);
stat = pthread_mutex_destroy(&jq->mutex);
stat1 = pthread_cond_destroy(&jq->work);
stat2 = pthread_attr_destroy(&jq->attr);
JCR *jcr = ((wait_pkt *)arg)->jcr;
jobq_t *jq = ((wait_pkt *)arg)->jq;
- set_jcr_in_tsd(jcr);
+ set_jcr_in_tsd(INVALID_JCR);
Dmsg0(2300, "Enter sched_wait.\n");
free(arg);
time_t wtime = jcr->sched_time - time(NULL);
return stat;
}
- if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
- berrno be;
- Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
- free_jcr(jcr); /* release jcr */
- return stat;
- }
+ P(jq->mutex);
if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
free_jcr(jcr); /* release jcr */
/* Ensure that at least one server looks at the queue. */
stat = start_server(jq);
- pthread_mutex_unlock(&jq->mutex);
+ V(jq->mutex);
Dmsg0(2300, "Return jobq_add\n");
return stat;
}
return EINVAL;
}
- if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
- berrno be;
- Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
- return stat;
- }
-
+ P(jq->mutex);
foreach_dlist(item, jq->waiting_jobs) {
if (jcr == item->jcr) {
found = true;
}
}
if (!found) {
- pthread_mutex_unlock(&jq->mutex);
+ V(jq->mutex);
Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x not in wait queue\n", jcr->JobId, jcr);
return EINVAL;
}
stat = start_server(jq);
- pthread_mutex_unlock(&jq->mutex);
+ V(jq->mutex);
Dmsg0(2300, "Return jobq_remove\n");
return stat;
}
Dmsg0(2300, "Create worker thread\n");
/* No idle threads so create a new one */
set_thread_concurrency(jq->max_workers + 1);
+ jq->num_workers++;
if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
berrno be;
+ jq->num_workers--;
Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat));
return stat;
}
set_jcr_in_tsd(INVALID_JCR);
Dmsg0(2300, "Start jobq_server\n");
- if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
- berrno be;
- Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat));
- return NULL;
- }
- jq->num_workers++;
+ P(jq->mutex);
for (;;) {
struct timeval tv;
/* This shouldn't happen */
Dmsg0(2300, "This shouldn't happen\n");
jq->num_workers--;
- pthread_mutex_unlock(&jq->mutex);
+ V(jq->mutex);
return NULL;
}
break;
Dmsg0(2300, "ready queue not empty start server\n");
if (start_server(jq) != 0) {
jq->num_workers--;
- pthread_mutex_unlock(&jq->mutex);
+ V(jq->mutex);
return NULL;
}
}
jq->running_jobs->append(je);
/* Attach jcr to this thread while we run the job */
+ jcr->set_killable(true);
set_jcr_in_tsd(jcr);
Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);
V(jq->mutex);
/* Call user's routine here */
- Dmsg2(2300, "Calling user engine for jobid=%d use=%d\n", jcr->JobId,
- jcr->use_count());
+ Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
+ jcr->use_count(), jcr->JobStatus);
jq->engine(je->jcr);
/* Job finished detach from thread */
- set_jcr_in_tsd(INVALID_JCR);
+ remove_jcr_from_tsd(je->jcr);
+ je->jcr->set_killable(false);
Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
jcr->use_count());
*/
static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je)
{
+ bool resched = false;
/*
- * Reschedule the job if necessary and requested
+ * Reschedule the job if requested and possible
*/
- if (jcr->job->RescheduleOnError &&
- jcr->JobStatus != JS_Terminated &&
- jcr->JobStatus != JS_Canceled &&
- jcr->get_JobType() == JT_BACKUP &&
- (jcr->job->RescheduleTimes == 0 ||
- jcr->reschedule_count < jcr->job->RescheduleTimes)) {
+ /* Basic condition is that more reschedule times remain */
+ if (jcr->job->RescheduleTimes == 0 ||
+ jcr->reschedule_count < jcr->job->RescheduleTimes) {
+ resched =
+ /* Check for incomplete jobs */
+ (jcr->job->RescheduleIncompleteJobs &&
+ jcr->is_incomplete() && jcr->is_JobType(JT_BACKUP)) ||
+ /* Check for failed jobs */
+ (jcr->job->RescheduleOnError &&
+ !jcr->is_JobStatus(JS_Terminated) &&
+ !jcr->is_JobStatus(JS_Canceled) &&
+ jcr->is_JobType(JT_BACKUP));
+ }
+ if (resched) {
char dt[50], dt2[50];
/*
* Reschedule this job by cleaning it up, but
* reuse the same JobId if possible.
*/
+ jcr->incomplete = jcr->is_incomplete(); /* save incomplete status */
time_t now = time(NULL);
jcr->reschedule_count++;
jcr->sched_time = now + jcr->job->RescheduleInterval;
jcr->JobStatus = -1;
set_jcr_job_status(jcr, JS_WaitStartTime);
jcr->SDJobStatus = 0;
+ jcr->JobErrors = 0;
if (!allow_duplicate_job(jcr)) {
return false;
}
- if (jcr->JobBytes == 0) {
+ /* Only jobs with no output or Incomplete jobs can run on same JCR */
+ if (jcr->JobBytes == 0 || jcr->incomplete) {
Dmsg2(2300, "Requeue job=%d use=%d\n", jcr->JobId, jcr->use_count());
V(jq->mutex);
jobq_add(jq, jcr); /* queue the job to run again */
set_jcr_defaults(njcr, jcr->job);
njcr->reschedule_count = jcr->reschedule_count;
njcr->sched_time = jcr->sched_time;
- njcr->set_JobLevel(jcr->get_JobLevel());
+ njcr->set_JobLevel(jcr->getJobLevel());
njcr->pool = jcr->pool;
njcr->run_pool_override = jcr->run_pool_override;
njcr->full_pool = jcr->full_pool;
* but we do not really have enough information here to
* know if this is really a deadlock (it may be a dual drive
* autochanger), and in principle, the SD reservation system
- * should detect these deadlocks, so push the work off on is.
+ * should detect these deadlocks, so push the work off on it.
*/
#ifdef xxx
if (jcr->rstore && jcr->rstore == jcr->wstore) { /* possible deadlock */
#endif
if (jcr->rstore) {
Dmsg1(200, "Rstore=%s\n", jcr->rstore->name());
- if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
- jcr->rstore->NumConcurrentReadJobs++;
- jcr->rstore->NumConcurrentJobs++;
- Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
- } else {
+ if (!inc_read_store(jcr)) {
Dmsg1(200, "Fail rncj=%d\n", jcr->rstore->NumConcurrentJobs);
set_jcr_job_status(jcr, JS_WaitStoreRes);
return false;
return true;
}
-static void dec_read_store(JCR *jcr)
+static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/*
+ * Note: inc_read_store() and dec_read_store() are
+ * called from select_rstore() in src/dird/restore.c
+ */
+bool inc_read_store(JCR *jcr)
+{
+ P(rstore_mutex);
+ if (jcr->rstore->NumConcurrentJobs < jcr->rstore->MaxConcurrentJobs) {
+ jcr->rstore->NumConcurrentReadJobs++;
+ jcr->rstore->NumConcurrentJobs++;
+ Dmsg1(200, "Inc rncj=%d\n", jcr->rstore->NumConcurrentJobs);
+ V(rstore_mutex);
+ return true;
+ }
+ V(rstore_mutex);
+ return false;
+}
+
+void dec_read_store(JCR *jcr)
{
if (jcr->rstore) {
+ P(rstore_mutex);
jcr->rstore->NumConcurrentReadJobs--; /* back out rstore */
jcr->rstore->NumConcurrentJobs--; /* back out rstore */
Dmsg1(200, "Dec rncj=%d\n", jcr->rstore->NumConcurrentJobs);
+ V(rstore_mutex);
ASSERT(jcr->rstore->NumConcurrentReadJobs >= 0);
ASSERT(jcr->rstore->NumConcurrentJobs >= 0);
}