X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Fdird%2Fjobq.c;h=c4a4c8cb8b53b922c6a3f87ea20759d5422a373c;hb=0796a980f144632ab1faa901883e619ebd291d04;hp=780b2f258fb5d51f1d3921a24e821a538599b489;hpb=e8bc66eaa7271032120d77c3b18c87d98585fcfc;p=bacula%2Fbacula diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c old mode 100755 new mode 100644 index 780b2f258f..c4a4c8cb8b --- a/bacula/src/dird/jobq.c +++ b/bacula/src/dird/jobq.c @@ -1,22 +1,3 @@ -/* - * Bacula job queue routines. - * - * This code consists of three queues, the waiting_jobs - * queue, where jobs are initially queued, the ready_jobs - * queue, where jobs are placed when all the resources are - * allocated and they can immediately be run, and the - * running queue where jobs are placed when they are - * running. - * - * Kern Sibbald, July MMIII - * - * Version $Id$ - * - * This code was adapted from the Bacula workq, which was - * adapted from "Programming with POSIX Threads", by - * David R. Butenhof - * - */ /* Bacula® - The Network Backup Solution @@ -26,8 +7,8 @@ many others, a complete list can be found in the file AUTHORS. This program is Free Software; you can redistribute it and/or modify it under the terms of version two of the GNU General Public - License as published by the Free Software Foundation plus additions - that are listed in the file LICENSE. + License as published by the Free Software Foundation and included + in the file LICENSE. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -44,6 +25,25 @@ (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, Switzerland, email:ftf@fsfeurope.org. */ +/* + * Bacula job queue routines. + * + * This code consists of three queues, the waiting_jobs + * queue, where jobs are initially queued, the ready_jobs + * queue, where jobs are placed when all the resources are + * allocated and they can immediately be run, and the + * running queue where jobs are placed when they are + * running. + * + * Kern Sibbald, July MMIII + * + * Version $Id$ + * + * This code was adapted from the Bacula workq, which was + * adapted from "Programming with POSIX Threads", by + * David R. Butenhof + * + */ #include "bacula.h" #include "dird.h" @@ -72,7 +72,7 @@ int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg)) if ((stat = pthread_attr_init(&jq->attr)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_attr_init: ERR=%s\n"), be.bstrerror(stat)); return stat; } if ((stat = pthread_attr_setdetachstate(&jq->attr, PTHREAD_CREATE_DETACHED)) != 0) { @@ -81,13 +81,13 @@ int jobq_init(jobq_t *jq, int threads, void *(*engine)(void *arg)) } if ((stat = pthread_mutex_init(&jq->mutex, NULL)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_init: ERR=%s\n"), be.bstrerror(stat)); pthread_attr_destroy(&jq->attr); return stat; } if ((stat = pthread_cond_init(&jq->work, NULL)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_init: ERR=%s\n"), be.bstrerror(stat)); pthread_mutex_destroy(&jq->mutex); pthread_attr_destroy(&jq->attr); return stat; @@ -120,7 +120,7 @@ int jobq_destroy(jobq_t *jq) } if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat)); return stat; } jq->valid = 0; /* prevent any more operations */ @@ -133,7 +133,7 @@ int jobq_destroy(jobq_t *jq) if (jq->idle_workers) { if ((stat = pthread_cond_broadcast(&jq->work)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(stat)); pthread_mutex_unlock(&jq->mutex); return stat; } @@ -141,7 +141,7 @@ int jobq_destroy(jobq_t *jq) while (jq->num_workers > 0) { if ((stat = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_wait: ERR=%s\n"), be.bstrerror(stat)); pthread_mutex_unlock(&jq->mutex); return stat; } @@ -149,7 +149,7 @@ int jobq_destroy(jobq_t *jq) } if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_unlock: ERR=%s\n"), be.bstrerror(stat)); return stat; } stat = pthread_mutex_destroy(&jq->mutex); @@ -225,7 +225,7 @@ int jobq_add(jobq_t *jq, JCR *jcr) /* Initialize termination condition variable */ if ((stat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) { berrno be; - Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.strerror(stat)); + Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(stat)); return stat; } jcr->term_wait_inited = true; @@ -247,14 +247,14 @@ int jobq_add(jobq_t *jq, JCR *jcr) stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt); if (stat != 0) { /* thread not created */ berrno be; - Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.strerror(stat)); + Jmsg1(jcr, M_ERROR, 0, _("pthread_thread_create: ERR=%s\n"), be.bstrerror(stat)); } return stat; } if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { berrno be; - Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat)); + Jmsg1(jcr, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat)); free_jcr(jcr); /* release jcr */ return stat; } @@ -319,7 +319,7 @@ int jobq_remove(jobq_t *jq, JCR *jcr) if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat)); return stat; } @@ -365,7 +365,7 @@ static int start_server(jobq_t *jq) Dmsg0(2300, "Signal worker to wake up\n"); if ((stat = pthread_cond_broadcast(&jq->work)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(stat)); return stat; } } else if (jq->num_workers < jq->max_workers) { @@ -374,7 +374,7 @@ static int start_server(jobq_t *jq) set_thread_concurrency(jq->max_workers + 1); if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(stat)); return stat; } } @@ -400,7 +400,7 @@ void *jobq_server(void *arg) Dmsg0(2300, "Start jobq_server\n"); if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { berrno be; - Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.strerror(stat)); + Jmsg1(NULL, M_ERROR, 0, _("pthread_mutex_lock: ERR=%s\n"), be.bstrerror(stat)); return NULL; } jq->num_workers++; @@ -453,6 +453,7 @@ void *jobq_server(void *arg) } } jq->running_jobs->append(je); + set_jcr_in_tsd(jcr); Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId); /* Release job queue lock */ @@ -682,10 +683,10 @@ static bool acquire_resources(JCR *jcr) jcr->acquired_resource_locks = false; if (jcr->rstore) { Dmsg1(200, "Rstore=%s\n", jcr->rstore->name()); - /* - * Let only one Restore/verify job run at a time regardless - * of MaxConcurrentJobs. - */ + /* + * Let only one Restore/Verify job run at a time regardless + * of MaxConcurrentjobs. + */ if (jcr->rstore->NumConcurrentJobs == 0) { jcr->rstore->NumConcurrentJobs = 1; Dmsg0(200, "Set rncj=1\n");