From 649b0ba92b3c8d3694d36cf8ae8e251f0f464d3b Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Sun, 24 Aug 2003 14:56:29 +0000 Subject: [PATCH] Drop old semaphore and workq code git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@676 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/src/dird/dird_conf.h | 16 --- bacula/src/dird/job.c | 257 +----------------------------------- bacula/src/dird/jobq.c | 3 - bacula/src/dird/ua_cmds.c | 4 - bacula/src/lib/tree.h | 2 +- bacula/src/version.h | 7 - 6 files changed, 4 insertions(+), 285 deletions(-) diff --git a/bacula/src/dird/dird_conf.h b/bacula/src/dird/dird_conf.h index 1ac59ff1e0..247610f1a5 100644 --- a/bacula/src/dird/dird_conf.h +++ b/bacula/src/dird/dird_conf.h @@ -146,12 +146,7 @@ struct CLIENT { char *password; CAT *catalog; /* Catalog resource */ uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */ -#ifdef USE_SEMAPHORE - semlock_t sem; /* storage semaphore */ -#endif -#ifdef JOB_QUEUE uint32_t NumConcurrentJobs; /* number of concurrent jobs running */ -#endif int enable_ssl; /* Use SSL */ }; @@ -170,12 +165,7 @@ struct STORE { char *dev_name; int autochanger; /* set if autochanger */ uint32_t MaxConcurrentJobs; /* Maximume concurrent jobs */ -#ifdef USE_SEMAPHORE - semlock_t sem; /* storage semaphore */ -#endif -#ifdef JOB_QUEUE uint32_t NumConcurrentJobs; /* number of concurrent jobs running */ -#endif int enable_ssl; /* Use SSL */ }; @@ -215,13 +205,7 @@ struct JOB { FILESET *fileset; /* What to backup -- Fileset */ STORE *storage; /* Where is device -- Storage daemon */ POOL *pool; /* Where is media -- Media Pool */ - -#ifdef USE_SEMAPHORE - semlock_t sem; /* storage semaphore */ -#endif -#ifdef JOB_QUEUE uint32_t NumConcurrentJobs; /* number of concurrent jobs running */ -#endif }; #define MAX_FOPTS 30 diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index 3987beee4d..2878a9e634 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -31,11 +31,6 @@ /* Forward referenced subroutines */ static void *job_thread(void *arg); -static int acquire_resource_locks(JCR *jcr); -#ifdef USE_SEMAPHORE -static void backoff_resource_locks(JCR *jcr, int count); -static void release_resource_locks(JCR *jcr); -#endif /* Exported subroutines */ @@ -48,38 +43,14 @@ extern int do_admin(JCR *jcr); extern int do_restore(JCR *jcr); extern int do_verify(JCR *jcr); -#ifdef USE_SEMAPHORE -static semlock_t job_lock; -static pthread_mutex_t mutex; -static pthread_cond_t resource_wait; -static int waiting = 0; /* count of waiting threads */ -#else -#ifdef JOB_QUEUE jobq_t job_queue; -#endif -#endif void init_job_server(int max_workers) { int stat; -#ifdef USE_SEMAPHORE - if ((stat = sem_init(&job_lock, max_workers)) != 0) { - Emsg1(M_ABORT, 0, _("Could not init job lock: ERR=%s\n"), strerror(stat)); - } - if ((stat = pthread_mutex_init(&mutex, NULL)) != 0) { - Emsg1(M_ABORT, 0, _("Could not init resource mutex: ERR=%s\n"), strerror(stat)); - } - if ((stat = pthread_cond_init(&resource_wait, NULL)) != 0) { - Emsg1(M_ABORT, 0, _("Could not init resource wait: ERR=%s\n"), strerror(stat)); - } - -#else -#ifdef JOB_QUEUE if ((stat = jobq_init(&job_queue, max_workers, job_thread)) != 0) { Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat)); } -#endif -#endif return; } @@ -91,9 +62,6 @@ void init_job_server(int max_workers) void run_job(JCR *jcr) { int stat, errstat; -#ifdef USE_SEMAPHORE - pthread_t tid; -#endif sm_check(__FILE__, __LINE__, True); init_msg(jcr, jcr->messages); @@ -151,25 +119,18 @@ void run_job(JCR *jcr) jcr->JobId, jcr->Job, jcr->jr.Type, jcr->jr.Level); Dmsg0(200, "Add jrc to work queue\n"); -#ifdef USE_SEMAPHORE - if ((stat = pthread_create(&tid, NULL, job_thread, (void *)jcr)) != 0) { - Emsg1(M_ABORT, 0, _("Unable to create job thread: ERR=%s\n"), strerror(stat)); - } -#else -#ifdef JOB_QUEUE /* Queue the job to be run */ if ((stat = jobq_add(&job_queue, jcr)) != 0) { Emsg1(M_ABORT, 0, _("Could not add job queue: ERR=%s\n"), strerror(stat)); } -#endif -#endif Dmsg0(100, "Done run_job()\n"); } /* - * This is the engine called by workq_add() when we were pulled + * This is the engine called by job_add() when we were pulled * from the work queue. - * At this point, we are running in our own thread + * At this point, we are running in our own thread and all + * necessary resources are allocated -- see jobq.c */ static void *job_thread(void *arg) { @@ -179,9 +140,6 @@ static void *job_thread(void *arg) sm_check(__FILE__, __LINE__, True); for ( ;; ) { - if (!acquire_resource_locks(jcr)) { - set_jcr_job_status(jcr, JS_Canceled); - } Dmsg0(200, "=====Start Job=========\n"); jcr->start_time = time(NULL); /* set the real start time */ @@ -269,223 +227,14 @@ static void *job_thread(void *arg) } } bail_out: -#ifndef JOB_QUEUE - release_resource_locks(jcr); - if (jcr->job->RescheduleOnError && - jcr->JobStatus != JS_Terminated && - jcr->JobStatus != JS_Canceled && - jcr->job->RescheduleTimes > 0 && - jcr->reschedule_count < jcr->job->RescheduleTimes) { - - /* - * Reschedule this job by cleaning it up, but - * reuse the same JobId if possible. - */ - jcr->reschedule_count++; - jcr->sched_time = time(NULL) + jcr->job->RescheduleInterval; - Dmsg2(100, "Rescheduled Job %s to re-run in %d seconds.\n", jcr->Job, - (int)jcr->job->RescheduleInterval); - jcr->JobStatus = JS_Created; /* force new status */ - dird_free_jcr(jcr); /* partial cleanup old stuff */ - if (jcr->JobBytes == 0) { - continue; /* reschedule the job */ - } - /* - * Something was actually backed up, so we cannot reuse - * the old JobId or there will be database record - * conflicts. We now create a new job, copying the - * appropriate fields. - */ - JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr); - set_jcr_defaults(njcr, jcr->job); - njcr->reschedule_count = jcr->reschedule_count; - njcr->JobLevel = jcr->JobLevel; - njcr->JobStatus = jcr->JobStatus; - njcr->pool = jcr->pool; - njcr->store = jcr->store; - njcr->messages = jcr->messages; - run_job(njcr); - } -#endif break; } -#ifndef JOB_QUEUE - if (jcr->db) { - Dmsg0(200, "Close DB\n"); - db_close_database(jcr, jcr->db); - jcr->db = NULL; - } - free_jcr(jcr); -#endif Dmsg0(50, "======== End Job ==========\n"); sm_check(__FILE__, __LINE__, True); return NULL; } -/* - * Acquire the resources needed. These locks limit the - * number of jobs by each resource. We have limits on - * Jobs, Clients, Storage, and total jobs. - */ -static int acquire_resource_locks(JCR *jcr) -{ -#ifndef JOB_QUEUE - time_t now = time(NULL); - time_t wtime = jcr->sched_time - now; - - /* Wait until scheduled time arrives */ - if (wtime > 0 && verbose) { - Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"), - jcr->Job, wtime); - set_jcr_job_status(jcr, JS_WaitStartTime); - } - /* Check every 30 seconds if canceled */ - while (wtime > 0) { - Dmsg2(100, "Waiting on sched time, jobid=%d secs=%d\n", jcr->JobId, wtime); - if (wtime > 30) { - wtime = 30; - } - bmicrosleep(wtime, 0); - if (job_canceled(jcr)) { - return 0; - } - wtime = jcr->sched_time - time(NULL); - } -#endif - - -#ifdef USE_SEMAPHORE - int stat; - - /* Initialize semaphores */ - if (jcr->store->sem.valid != SEMLOCK_VALID) { - if ((stat = sem_init(&jcr->store->sem, jcr->store->MaxConcurrentJobs)) != 0) { - Emsg1(M_ABORT, 0, _("Could not init Storage semaphore: ERR=%s\n"), strerror(stat)); - } - } - if (jcr->client->sem.valid != SEMLOCK_VALID) { - if ((stat = sem_init(&jcr->client->sem, jcr->client->MaxConcurrentJobs)) != 0) { - Emsg1(M_ABORT, 0, _("Could not init Client semaphore: ERR=%s\n"), strerror(stat)); - } - } - if (jcr->job->sem.valid != SEMLOCK_VALID) { - if ((stat = sem_init(&jcr->job->sem, jcr->job->MaxConcurrentJobs)) != 0) { - Emsg1(M_ABORT, 0, _("Could not init Job semaphore: ERR=%s\n"), strerror(stat)); - } - } - - for ( ;; ) { - /* Acquire semaphore */ - set_jcr_job_status(jcr, JS_WaitJobRes); - if ((stat = sem_lock(&jcr->job->sem)) != 0) { - Emsg1(M_ABORT, 0, _("Could not acquire Job max jobs lock: ERR=%s\n"), strerror(stat)); - } - set_jcr_job_status(jcr, JS_WaitClientRes); - if ((stat = sem_trylock(&jcr->client->sem)) != 0) { - if (stat == EBUSY) { - backoff_resource_locks(jcr, 1); - goto wait; - } else { - Emsg1(M_ABORT, 0, _("Could not acquire Client max jobs lock: ERR=%s\n"), strerror(stat)); - } - } - set_jcr_job_status(jcr, JS_WaitStoreRes); - if ((stat = sem_trylock(&jcr->store->sem)) != 0) { - if (stat == EBUSY) { - backoff_resource_locks(jcr, 2); - goto wait; - } else { - Emsg1(M_ABORT, 0, _("Could not acquire Storage max jobs lock: ERR=%s\n"), strerror(stat)); - } - } - set_jcr_job_status(jcr, JS_WaitMaxJobs); - if ((stat = sem_trylock(&job_lock)) != 0) { - if (stat == EBUSY) { - backoff_resource_locks(jcr, 3); - goto wait; - } else { - Emsg1(M_ABORT, 0, _("Could not acquire max jobs lock: ERR=%s\n"), strerror(stat)); - } - } - break; - -wait: - if (job_canceled(jcr)) { - return 0; - } - P(mutex); - /* - * Wait for a resource to be released either by backoff or - * by a job terminating. - */ - waiting++; - pthread_cond_wait(&resource_wait, &mutex); - waiting--; - V(mutex); - /* Try again */ - } - jcr->acquired_resource_locks = true; -#endif - return 1; -} - -#ifdef USE_SEMAPHORE -/* - * We could not get all the resource locks because - * too many jobs are running, so release any locks - * we did acquire, giving others a chance to use them - * while we wait. - */ -static void backoff_resource_locks(JCR *jcr, int count) -{ - P(mutex); - switch (count) { - case 3: - sem_unlock(&jcr->store->sem); - /* Fall through wanted */ - case 2: - sem_unlock(&jcr->client->sem); - /* Fall through wanted */ - case 1: - sem_unlock(&jcr->job->sem); - break; - } - /* - * Since we released a lock, if there are any threads - * waiting, wake them up so that they can try again. - */ - if (waiting > 0) { - pthread_cond_broadcast(&resource_wait); - } - V(mutex); -} -#endif - -/* - * This is called at the end of the job to release - * any resource limits on the number of jobs. If - * there are any other jobs waiting, we wake them - * up so that they can try again. - */ -#ifdef USE_SEMAPHORE -static void release_resource_locks(JCR *jcr) -{ - if (!jcr->acquired_resource_locks) { - return; /* Job canceled, no locks acquired */ - } - P(mutex); - sem_unlock(&jcr->store->sem); - sem_unlock(&jcr->client->sem); - sem_unlock(&jcr->job->sem); - sem_unlock(&job_lock); - if (waiting > 0) { - pthread_cond_broadcast(&resource_wait); - } - jcr->acquired_resource_locks = false; - V(mutex); -} -#endif /* * Get or create a Client record for this Job diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c index c416ee7e2b..10cffd0f1c 100755 --- a/bacula/src/dird/jobq.c +++ b/bacula/src/dird/jobq.c @@ -40,7 +40,6 @@ #include "bacula.h" #include "dird.h" -#ifdef JOB_QUEUE /* Forward referenced functions */ static void *jobq_server(void *arg); @@ -569,5 +568,3 @@ static void *jobq_server(void *arg) Dmsg0(100, "End jobq_server\n"); return NULL; } - -#endif /* JOB_QUEUE */ diff --git a/bacula/src/dird/ua_cmds.c b/bacula/src/dird/ua_cmds.c index 3d10c87c29..68bceaddb8 100644 --- a/bacula/src/dird/ua_cmds.c +++ b/bacula/src/dird/ua_cmds.c @@ -37,9 +37,7 @@ extern int r_first; extern int r_last; extern struct s_res resources[]; extern char my_name[]; -#ifdef JOB_QUEUE extern jobq_t job_queue; /* job queue */ -#endif extern char *list_pool; @@ -431,9 +429,7 @@ static int cancelcmd(UAContext *ua, char *cmd) set_jcr_job_status(jcr, JS_Canceled); bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"), jcr->JobId, jcr->Job); -#ifdef JOB_QUEUE jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */ -#endif free_jcr(jcr); /* this decrements the use count only */ return 1; diff --git a/bacula/src/lib/tree.h b/bacula/src/lib/tree.h index 1367c63abe..f106ded880 100644 --- a/bacula/src/lib/tree.h +++ b/bacula/src/lib/tree.h @@ -5,7 +5,7 @@ * */ /* - Copyright (C) 2002 Kern Sibbald and John Walker + Copyright (C) 2002,2003 Kern Sibbald and John Walker This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as diff --git a/bacula/src/version.h b/bacula/src/version.h index 338f62dda7..aa3edb782e 100644 --- a/bacula/src/version.h +++ b/bacula/src/version.h @@ -19,13 +19,6 @@ */ /* #define SEND_DMSG_TO_FILE 1 */ -/* Turn this on if you want to use the Job semaphore code */ -/* #define USE_SEMAPHORE */ - -/* Turn this on if you want to use the new Job scheduling code */ -#undef USE_SEMAPHORE -#define JOB_QUEUE 1 - /* #define NO_ATTRIBUTES_TEST 1 */ /* #define NO_TAPE_WRITE_TEST 1 */ -- 2.39.2