From 51a73ce3e75a35d88a6878a4a6bc591f27ef6576 Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Mon, 21 Jul 2003 13:51:21 +0000 Subject: [PATCH] Fix rescheduling on error for new job queue code git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@640 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/src/dird/dird.c | 1 - bacula/src/dird/job.c | 29 ++--- bacula/src/dird/jobq.c | 62 +++++++++-- bacula/src/dird/protos.h | 1 + bacula/src/dird/ua_cmds.c | 10 +- bacula/src/dird/ua_run.c | 215 +++++++++++++++++++------------------- 6 files changed, 170 insertions(+), 148 deletions(-) diff --git a/bacula/src/dird/dird.c b/bacula/src/dird/dird.c index f2960894db..f6f0213c71 100644 --- a/bacula/src/dird/dird.c +++ b/bacula/src/dird/dird.c @@ -44,7 +44,6 @@ extern void term_ua_server(); extern int do_backup(JCR *jcr); extern void backup_cleanup(void); extern void start_UA_server(char *addr, int port); -extern void run_job(JCR *jcr); extern void init_job_server(int max_workers); static char *configfile = NULL; diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index e026dffbc6..75ea929ffb 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -32,14 +32,13 @@ /* Forward referenced subroutines */ static void *job_thread(void *arg); static char *edit_run_codes(JCR *jcr, char *omsg, char *imsg); -static void release_resource_locks(JCR *jcr); static int acquire_resource_locks(JCR *jcr); #ifdef USE_SEMAPHORE static void backoff_resource_locks(JCR *jcr, int count); +static void release_resource_locks(JCR *jcr); #endif /* Exported subroutines */ -void run_job(JCR *jcr); /* Imported subroutines */ @@ -58,9 +57,6 @@ static int waiting = 0; /* count of waiting threads */ #else #ifdef JOB_QUEUE jobq_t job_queue; -#else -/* Queue of jobs to be run */ -workq_t job_wq; /* our job work queue */ #endif #endif @@ -83,11 +79,6 @@ void init_job_server(int max_workers) if ((stat = jobq_init(&job_queue, max_workers, job_thread)) != 0) { Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat)); } -#else - /* This is the OLD work queue code to go away */ - if ((stat = workq_init(&job_wq, max_workers, job_thread)) != 0) { - Emsg1(M_ABORT, 0, _("Could not init job work queue: ERR=%s\n"), strerror(stat)); - } #endif #endif return; @@ -103,10 +94,6 @@ void run_job(JCR *jcr) int stat, errstat; #ifdef USE_SEMAPHORE pthread_t tid; -#else -#ifndef JOB_QUEUE - workq_ele_t *work_item; -#endif #endif sm_check(__FILE__, __LINE__, True); @@ -171,12 +158,6 @@ void run_job(JCR *jcr) if ((stat = jobq_add(&job_queue, jcr)) != 0) { Emsg1(M_ABORT, 0, _("Could not add job queue: ERR=%s\n"), strerror(stat)); } -#else - /* Queue the job to be run */ - if ((stat = workq_add(&job_wq, (void *)jcr, &work_item, 0)) != 0) { - Emsg1(M_ABORT, 0, _("Could not add job to work queue: ERR=%s\n"), strerror(stat)); - } - jcr->work_item = work_item; #endif #endif Dmsg0(200, "Done run_job()\n"); @@ -285,6 +266,7 @@ static void *job_thread(void *arg) } } bail_out: +#ifndef JOB_QUEUE release_resource_locks(jcr); if (jcr->job->RescheduleOnError && jcr->JobStatus != JS_Terminated && @@ -321,15 +303,16 @@ bail_out: njcr->messages = jcr->messages; run_job(njcr); } +#endif break; } +#ifndef JOB_QUEUE if (jcr->db) { Dmsg0(200, "Close DB\n"); db_close_database(jcr, jcr->db); jcr->db = NULL; } -#ifndef JOB_QUEUE free_jcr(jcr); #endif Dmsg0(50, "======== End Job ==========\n"); @@ -482,12 +465,12 @@ static void backoff_resource_locks(JCR *jcr, int count) * there are any other jobs waiting, we wake them * up so that they can try again. */ +#ifdef USE_SEMAPHORE static void release_resource_locks(JCR *jcr) { if (!jcr->acquired_resource_locks) { return; /* Job canceled, no locks acquired */ } -#ifdef USE_SEMAPHORE P(mutex); sem_unlock(&jcr->store->sem); sem_unlock(&jcr->client->sem); @@ -498,8 +481,8 @@ static void release_resource_locks(JCR *jcr) } jcr->acquired_resource_locks = false; V(mutex); -#endif } +#endif /* * Get or create a Client record for this Job diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c index a16f6b48fa..4496446b00 100755 --- a/bacula/src/dird/jobq.c +++ b/bacula/src/dird/jobq.c @@ -375,7 +375,9 @@ static void *jobq_server(void *arg) */ Dmsg0(100, "Checking ready queue.\n"); while (!jq->ready_jobs->empty() && !jq->quit) { + JCR *jcr; je = (jobq_item_t *)jq->ready_jobs->first(); + jcr = je->jcr; jq->ready_jobs->remove(je); if (!jq->ready_jobs->empty()) { Dmsg0(100, "ready queue not empty start server\n"); @@ -384,14 +386,14 @@ static void *jobq_server(void *arg) } } jq->running_jobs->append(je); - Dmsg1(100, "Took jobid=%d from ready and appended to run\n", je->jcr->JobId); + Dmsg1(100, "Took jobid=%d from ready and appended to run\n", jcr->JobId); if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) { return NULL; } /* Call user's routine here */ - Dmsg1(100, "Calling user engine for jobid=%d\n", je->jcr->JobId); + Dmsg1(100, "Calling user engine for jobid=%d\n", jcr->JobId); jq->engine(je->jcr); - Dmsg1(100, "Back from user engine jobid=%d.\n", je->jcr->JobId); + Dmsg1(100, "Back from user engine jobid=%d.\n", jcr->JobId); if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { free(je); /* release job entry */ return NULL; @@ -403,12 +405,56 @@ static void *jobq_server(void *arg) * been acquired for jobs canceled before they were * put into the ready queue. */ - if (je->jcr->acquired_resource_locks) { - je->jcr->store->NumConcurrentJobs--; - je->jcr->client->NumConcurrentJobs--; - je->jcr->job->NumConcurrentJobs--; + if (jcr->acquired_resource_locks) { + jcr->store->NumConcurrentJobs--; + jcr->client->NumConcurrentJobs--; + jcr->job->NumConcurrentJobs--; } - free_jcr(je->jcr); + + if (jcr->job->RescheduleOnError && + jcr->JobStatus != JS_Terminated && + jcr->JobStatus != JS_Canceled && + jcr->job->RescheduleTimes > 0 && + jcr->reschedule_count < jcr->job->RescheduleTimes) { + + /* + * Reschedule this job by cleaning it up, but + * reuse the same JobId if possible. + */ + jcr->reschedule_count++; + jcr->sched_time = time(NULL) + jcr->job->RescheduleInterval; + Dmsg2(100, "Rescheduled Job %s to re-run in %d seconds.\n", jcr->Job, + (int)jcr->job->RescheduleInterval); + jcr->JobStatus = JS_Created; /* force new status */ + dird_free_jcr(jcr); /* partial cleanup old stuff */ + if (jcr->JobBytes == 0) { + jobq_add(jq, jcr); /* queue the job to run again */ + free(je); /* free the job entry */ + continue; + } + /* + * Something was actually backed up, so we cannot reuse + * the old JobId or there will be database record + * conflicts. We now create a new job, copying the + * appropriate fields. + */ + JCR *njcr = new_jcr(sizeof(JCR), dird_free_jcr); + set_jcr_defaults(njcr, jcr->job); + njcr->reschedule_count = jcr->reschedule_count; + njcr->JobLevel = jcr->JobLevel; + njcr->JobStatus = jcr->JobStatus; + njcr->pool = jcr->pool; + njcr->store = jcr->store; + njcr->messages = jcr->messages; + run_job(njcr); + } + /* Clean up and release old jcr */ + if (jcr->db) { + Dmsg0(200, "Close DB\n"); + db_close_database(jcr, jcr->db); + jcr->db = NULL; + } + free_jcr(jcr); free(je); /* release job entry */ } /* diff --git a/bacula/src/dird/protos.h b/bacula/src/dird/protos.h index 6614050f80..e0dc391bf9 100644 --- a/bacula/src/dird/protos.h +++ b/bacula/src/dird/protos.h @@ -85,6 +85,7 @@ extern void set_jcr_defaults(JCR *jcr, JOB *job); extern void create_unique_job_name(JCR *jcr, char *base_name); extern void update_job_end_record(JCR *jcr); extern int get_or_create_client_record(JCR *jcr); +extern void run_job(JCR *jcr); /* mountreq.c */ extern void mount_request(JCR *jcr, BSOCK *bs, char *buf); diff --git a/bacula/src/dird/ua_cmds.c b/bacula/src/dird/ua_cmds.c index 01099a1060..e8dfaaf34f 100644 --- a/bacula/src/dird/ua_cmds.c +++ b/bacula/src/dird/ua_cmds.c @@ -37,12 +37,8 @@ extern int r_first; extern int r_last; extern struct s_res resources[]; extern char my_name[]; -#ifndef USE_SEMAPHORE #ifdef JOB_QUEUE extern jobq_t job_queue; /* job queue */ -#else -extern workq_t job_wq; /* work queue */ -#endif #endif extern char *list_pool; @@ -435,14 +431,10 @@ static int cancelcmd(UAContext *ua, char *cmd) set_jcr_job_status(jcr, JS_Canceled); bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"), jcr->JobId, jcr->Job); -#ifndef USE_SEMAPHORE #ifdef JOB_QUEUE jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */ -#else - workq_remove(&job_wq, jcr->work_item); /* attempt to remove it from queue */ -#endif #endif - free_jcr(jcr); + free_jcr(jcr); /* this decrements the use count only */ return 1; default: diff --git a/bacula/src/dird/ua_run.c b/bacula/src/dird/ua_run.c index 0144c45bd7..9a4a559dce 100644 --- a/bacula/src/dird/ua_run.c +++ b/bacula/src/dird/ua_run.c @@ -31,7 +31,6 @@ #include "dird.h" /* Imported subroutines */ -extern void run_job(JCR *jcr); /* Imported variables */ extern struct s_jl joblevels[]; @@ -71,9 +70,11 @@ int runcmd(UAContext *ua, char *cmd) N_("replace"), N_("when"), N_("priority"), - N_("yes"), /* 12 -- see below */ + N_("yes"), /* 12 -- if you change this change YES_POS too */ NULL}; +#define YES_POS 12 + if (!open_db(ua)) { return 1; } @@ -96,116 +97,116 @@ int runcmd(UAContext *ua, char *cmd) for (j=0; !found && kw[j]; j++) { if (strcasecmp(ua->argk[i], _(kw[j])) == 0) { /* Note, yes has no value, so do not err */ - if (!ua->argv[i] && j != 11 /*yes*/) { + if (!ua->argv[i] && j != YES_POS /*yes*/) { bsendmsg(ua, _("Value missing for keyword %s\n"), ua->argk[i]); return 1; } Dmsg1(200, "Got keyword=%s\n", kw[j]); switch (j) { - case 0: /* job */ - if (job_name) { - bsendmsg(ua, _("Job name specified twice.\n")); - return 1; - } - job_name = ua->argv[i]; - found = True; - break; - case 1: /* JobId */ - if (jid) { - bsendmsg(ua, _("JobId specified twice.\n")); - return 1; - } - jid = ua->argv[i]; - found = True; - break; - case 2: /* client */ - if (client_name) { - bsendmsg(ua, _("Client specified twice.\n")); - return 1; - } - client_name = ua->argv[i]; - found = True; - break; - case 3: /* fileset */ - if (fileset_name) { - bsendmsg(ua, _("FileSet specified twice.\n")); - return 1; - } - fileset_name = ua->argv[i]; - found = True; - break; - case 4: /* level */ - if (level_name) { - bsendmsg(ua, _("Level specified twice.\n")); - return 1; - } - level_name = ua->argv[i]; - found = True; - break; - case 5: /* storage */ - if (store_name) { - bsendmsg(ua, _("Storage specified twice.\n")); - return 1; - } - store_name = ua->argv[i]; - found = True; - break; - case 6: /* pool */ - if (pool_name) { - bsendmsg(ua, _("Pool specified twice.\n")); - return 1; - } - pool_name = ua->argv[i]; - found = True; - break; - case 7: /* where */ - if (where) { - bsendmsg(ua, _("Where specified twice.\n")); - return 1; - } - where = ua->argv[i]; - found = True; - break; - case 8: /* bootstrap */ - if (bootstrap) { - bsendmsg(ua, _("Bootstrap specified twice.\n")); - return 1; - } - bootstrap = ua->argv[i]; - found = True; - break; - case 9: /* replace */ - if (replace) { - bsendmsg(ua, _("Replace specified twice.\n")); - return 1; - } - replace = ua->argv[i]; - found = True; - break; - case 10: /* When */ - if (when) { - bsendmsg(ua, _("When specified twice.\n")); - return 1; - } - when = ua->argv[i]; - found = True; - break; - case 11: /* Priority */ - if (Priority) { - bsendmsg(ua, _("Priority specified twice.\n")); - return 1; - } - Priority = atoi(ua->argv[i]); - if (Priority <= 0) { - bsendmsg(ua, _("Priority must be positive nonzero setting it to 10.\n")); - Priority = 10; - } - break; - case 12: /* yes */ - found = True; - break; - default: - break; + case 0: /* job */ + if (job_name) { + bsendmsg(ua, _("Job name specified twice.\n")); + return 1; + } + job_name = ua->argv[i]; + found = True; + break; + case 1: /* JobId */ + if (jid) { + bsendmsg(ua, _("JobId specified twice.\n")); + return 1; + } + jid = ua->argv[i]; + found = True; + break; + case 2: /* client */ + if (client_name) { + bsendmsg(ua, _("Client specified twice.\n")); + return 1; + } + client_name = ua->argv[i]; + found = True; + break; + case 3: /* fileset */ + if (fileset_name) { + bsendmsg(ua, _("FileSet specified twice.\n")); + return 1; + } + fileset_name = ua->argv[i]; + found = True; + break; + case 4: /* level */ + if (level_name) { + bsendmsg(ua, _("Level specified twice.\n")); + return 1; + } + level_name = ua->argv[i]; + found = True; + break; + case 5: /* storage */ + if (store_name) { + bsendmsg(ua, _("Storage specified twice.\n")); + return 1; + } + store_name = ua->argv[i]; + found = True; + break; + case 6: /* pool */ + if (pool_name) { + bsendmsg(ua, _("Pool specified twice.\n")); + return 1; + } + pool_name = ua->argv[i]; + found = True; + break; + case 7: /* where */ + if (where) { + bsendmsg(ua, _("Where specified twice.\n")); + return 1; + } + where = ua->argv[i]; + found = True; + break; + case 8: /* bootstrap */ + if (bootstrap) { + bsendmsg(ua, _("Bootstrap specified twice.\n")); + return 1; + } + bootstrap = ua->argv[i]; + found = True; + break; + case 9: /* replace */ + if (replace) { + bsendmsg(ua, _("Replace specified twice.\n")); + return 1; + } + replace = ua->argv[i]; + found = True; + break; + case 10: /* When */ + if (when) { + bsendmsg(ua, _("When specified twice.\n")); + return 1; + } + when = ua->argv[i]; + found = True; + break; + case 11: /* Priority */ + if (Priority) { + bsendmsg(ua, _("Priority specified twice.\n")); + return 1; + } + Priority = atoi(ua->argv[i]); + if (Priority <= 0) { + bsendmsg(ua, _("Priority must be positive nonzero setting it to 10.\n")); + Priority = 10; + } + break; + case 12: /* yes */ + found = True; + break; + default: + break; } } /* end strcase compare */ } /* end keyword loop */ -- 2.39.5