From eb0a543f2fe2e1a21b9b027f51225dd40406e07b Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Tue, 23 Dec 2003 20:23:15 +0000 Subject: [PATCH] Nic's watchdog code + cleanup jcr locking/use_count git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@944 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/kernstodo | 2 + bacula/src/dird/dird.c | 5 +- bacula/src/dird/dird_conf.c | 1 + bacula/src/dird/dird_conf.h | 1 + bacula/src/dird/job.c | 265 +++++++++++++++++++++++++-- bacula/src/dird/jobq.c | 21 ++- bacula/src/dird/protos.h | 2 + bacula/src/dird/run_conf.c | 1 + bacula/src/dird/ua_cmds.c | 63 +------ bacula/src/dird/ua_run.c | 187 ++++++++++--------- bacula/src/dird/ua_server.c | 12 +- bacula/src/dird/ua_status.c | 11 +- bacula/src/filed/filed.c | 5 +- bacula/src/jcr.h | 2 + bacula/src/lib/Makefile.in | 4 +- bacula/src/lib/jcr.c | 137 ++++++++++++-- bacula/src/lib/lib.h | 1 + bacula/src/lib/protos.h | 8 +- bacula/src/lib/watchdog.c | 355 +++++++++++++++--------------------- bacula/src/lib/watchdog.h | 27 +-- bacula/src/stored/stored.c | 4 +- bacula/src/version.h | 4 +- 22 files changed, 691 insertions(+), 427 deletions(-) diff --git a/bacula/kernstodo b/bacula/kernstodo index fe847d8bae..8bfbd778e2 100644 --- a/bacula/kernstodo +++ b/bacula/kernstodo @@ -55,6 +55,8 @@ For 1.33 Testing/Documentation: - Add subsections to the Disaster Recovery index section. For 1.33 +- If a tape is recycled while it is mounted, Stanislav Tvrudy must do an + additional mount to deblock the job. - Notes for integrating Nic's code: - Most likely jcr crash is the free_jcr(). The ref count was not incremented in the watchdog call to cancel_job() as it is in the UA -- see diff --git a/bacula/src/dird/dird.c b/bacula/src/dird/dird.c index b413042122..f839831a5d 100644 --- a/bacula/src/dird/dird.c +++ b/bacula/src/dird/dird.c @@ -214,12 +214,15 @@ int main (int argc, char *argv[]) start_watchdog(); /* start network watchdog thread */ + init_jcr_subsystem(); /* start JCR watchdogs etc. */ + init_job_server(director->MaxConcurrentJobs); Dmsg0(200, "wait for next job\n"); /* Main loop -- call scheduler to get next job to run */ while ((jcr = wait_for_next_job(runjob))) { run_job(jcr); /* run job */ + free_jcr(jcr); /* release jcr */ if (runjob) { /* command line, run a single job? */ break; /* yes, terminate */ } @@ -239,7 +242,6 @@ static void terminate_dird(int sig) already_here = TRUE; delete_pid_file(director->pid_directory, "bacula-dir", director->DIRport); - stop_watchdog(); // signal(SIGCHLD, SIG_IGN); /* don't worry about children now */ term_scheduler(); if (runjob) { @@ -254,6 +256,7 @@ static void terminate_dird(int sig) free_config_resources(); term_ua_server(); term_msg(); /* terminate message handler */ + stop_watchdog(); close_memory_pool(); /* release free memory in pool */ sm_dump(False); exit(sig != 0); diff --git a/bacula/src/dird/dird_conf.c b/bacula/src/dird/dird_conf.c index 2b4bc13786..e1fa607ab3 100644 --- a/bacula/src/dird/dird_conf.c +++ b/bacula/src/dird/dird_conf.c @@ -209,6 +209,7 @@ static struct res_items job_items[] = { {"replace", store_replace, ITEM(res_job.replace), 0, ITEM_DEFAULT, REPLACE_ALWAYS}, {"bootstrap",store_dir, ITEM(res_job.RestoreBootstrap), 0, 0, 0}, {"maxruntime", store_time, ITEM(res_job.MaxRunTime), 0, 0, 0}, + {"maxwaittime", store_time, ITEM(res_job.MaxWaitTime), 0, 0, 0}, {"maxstartdelay", store_time,ITEM(res_job.MaxStartDelay), 0, 0, 0}, {"prefixlinks", store_yesno, ITEM(res_job.PrefixLinks), 1, ITEM_DEFAULT, 0}, {"prunejobs", store_yesno, ITEM(res_job.PruneJobs), 1, ITEM_DEFAULT, 0}, diff --git a/bacula/src/dird/dird_conf.h b/bacula/src/dird/dird_conf.h index bec790248c..ef375599cc 100644 --- a/bacula/src/dird/dird_conf.h +++ b/bacula/src/dird/dird_conf.h @@ -191,6 +191,7 @@ struct JOB { char *WriteBootstrap; /* Where to write bootstrap Job updates */ int replace; /* How (overwrite, ..) */ utime_t MaxRunTime; /* max run time in seconds */ + utime_t MaxWaitTime; /* max blocking time in seconds */ utime_t MaxStartDelay; /* max start delay in seconds */ int PrefixLinks; /* prefix soft links with Where path */ int PruneJobs; /* Force pruning of Jobs */ diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index e84a6422cc..1d3fd12096 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -31,10 +31,13 @@ /* Forward referenced subroutines */ static void *job_thread(void *arg); +static void job_monitor_watchdog(watchdog_t *self); +static void job_monitor_destructor(watchdog_t *self); +static bool job_check_maxwaittime(JCR *control_jcr, JCR *jcr); +static bool job_check_maxruntime(JCR *control_jcr, JCR *jcr); /* Exported subroutines */ - /* Imported subroutines */ extern void term_scheduler(); extern void term_ua_server(); @@ -43,17 +46,185 @@ extern int do_admin(JCR *jcr); extern int do_restore(JCR *jcr); extern int do_verify(JCR *jcr); +/* Imported variables */ +extern time_t watchdog_time; + jobq_t job_queue; void init_job_server(int max_workers) { int stat; + watchdog_t *wd; + if ((stat = jobq_init(&job_queue, max_workers, job_thread)) != 0) { Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat)); } + if ((wd = watchdog_new()) == NULL) { + Emsg0(M_ABORT, 0, _("Could not init job monitor watchdogs\n")); + } + wd->callback = job_monitor_watchdog; + wd->destructor = job_monitor_destructor; + wd->one_shot = false; + wd->interval = 60; + wd->data = create_control_jcr("*JobMonitor*", JT_SYSTEM); + register_watchdog(wd); + return; } +static void job_monitor_destructor(watchdog_t *self) +{ + JCR *control_jcr = (JCR *) self->data; + + free_jcr(control_jcr); +} + +static void job_monitor_watchdog(watchdog_t *self) +{ + JCR *control_jcr, *jcr; + + control_jcr = (JCR *) self->data; + + Dmsg1(200, "job_monitor_watchdog %p called\n", self); + + lock_jcr_chain(); + + for (jcr = NULL; (jcr = get_next_jcr(jcr)); /* nothing */) { + bool cancel; + + if (jcr->JobId == 0) { + Dmsg2(200, "Skipping JCR %p (%s) with JobId 0\n", + jcr, jcr->Job); + /* Keep reference counts correct */ + free_locked_jcr(jcr); + continue; + } + + /* check MaxWaitTime */ + cancel = job_check_maxwaittime(control_jcr, jcr); + + /* check MaxRunTime */ + cancel |= job_check_maxruntime(control_jcr, jcr); + + if (cancel) { + Dmsg3(200, "Cancelling JCR %p jobid %d (%s)\n", + jcr, jcr->JobId, jcr->Job); + + UAContext *ua = new_ua_context(jcr); + ua->jcr = control_jcr; + cancel_job(ua, jcr); + free_ua_context(ua); + + Dmsg1(200, "Have cancelled JCR %p\n", jcr); + } + + /* Keep reference counts correct */ + free_locked_jcr(jcr); + } + unlock_jcr_chain(); +} + +static bool job_check_maxwaittime(JCR *control_jcr, JCR *jcr) +{ + bool cancel = false; + + if (jcr->job->MaxWaitTime == 0) { + return false; + } + if ((watchdog_time - jcr->start_time) < jcr->job->MaxWaitTime) { + Dmsg3(200, "Job %p (%s) with MaxWaitTime %d not expired\n", + jcr, jcr->Job, jcr->job->MaxWaitTime); + return false; + } + Dmsg3(200, "Job %d (%s): MaxWaitTime of %d seconds exceeded, " + "checking status\n", + jcr->JobId, jcr->Job, jcr->job->MaxWaitTime); + switch (jcr->JobStatus) { + case JS_Created: + case JS_Blocked: + case JS_WaitFD: + case JS_WaitSD: + case JS_WaitStoreRes: + case JS_WaitClientRes: + case JS_WaitJobRes: + case JS_WaitPriority: + case JS_WaitMaxJobs: + case JS_WaitStartTime: + cancel = true; + Dmsg0(200, "JCR blocked in #1\n"); + break; + case JS_Running: + Dmsg0(200, "JCR running, checking SD status\n"); + switch (jcr->SDJobStatus) { + case JS_WaitMount: + case JS_WaitMedia: + case JS_WaitFD: + cancel = true; + Dmsg0(200, "JCR blocked in #2\n"); + break; + default: + Dmsg0(200, "JCR not blocked in #2\n"); + break; + } + break; + case JS_Terminated: + case JS_ErrorTerminated: + case JS_Canceled: + Dmsg0(200, "JCR already dead in #3\n"); + break; + default: + Emsg1(M_ABORT, 0, _("Unhandled job status code %d\n"), + jcr->JobStatus); + } + Dmsg3(200, "MaxWaitTime result: %scancel JCR %p (%s)\n", + cancel ? "" : "do not ", jcr, jcr->job); + + return cancel; +} + +static bool job_check_maxruntime(JCR *control_jcr, JCR *jcr) +{ + bool cancel = false; + + if (jcr->job->MaxRunTime == 0) { + return false; + } + if ((watchdog_time - jcr->start_time) < jcr->job->MaxRunTime) { + Dmsg3(200, "Job %p (%s) with MaxRunTime %d not expired\n", + jcr, jcr->Job, jcr->job->MaxRunTime); + return false; + } + + switch (jcr->JobStatus) { + case JS_Created: + case JS_Blocked: + case JS_WaitFD: + case JS_WaitSD: + case JS_WaitStoreRes: + case JS_WaitClientRes: + case JS_WaitJobRes: + case JS_WaitPriority: + case JS_WaitMaxJobs: + case JS_WaitStartTime: + case JS_Running: + cancel = true; + break; + case JS_Terminated: + case JS_ErrorTerminated: + case JS_Canceled: + cancel = false; + break; + default: + Emsg1(M_ABORT, 0, _("Unhandled job status code %d\n"), + jcr->JobStatus); + } + + Dmsg3(200, "MaxRunTime result: %scancel JCR %p (%s)\n", + cancel ? "" : "do not ", jcr, jcr->job); + + return cancel; +} + /* * Run a job -- typically called by the scheduler, but may also * be called by the UA (Console program). @@ -63,6 +234,7 @@ void run_job(JCR *jcr) { int stat, errstat; + P(jcr->mutex); sm_check(__FILE__, __LINE__, True); init_msg(jcr, jcr->messages); create_unique_job_name(jcr, jcr->job->hdr.name); @@ -79,9 +251,7 @@ void run_job(JCR *jcr) /* Initialize termination condition variable */ if ((errstat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) { Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), strerror(errstat)); - set_jcr_job_status(jcr, JS_ErrorTerminated); - free_jcr(jcr); - return; + goto bail_out; } /* @@ -97,9 +267,7 @@ void run_job(JCR *jcr) if (jcr->db) { Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db)); } - set_jcr_job_status(jcr, JS_ErrorTerminated); - free_jcr(jcr); - return; + goto bail_out; } Dmsg0(50, "DB opened\n"); @@ -109,9 +277,7 @@ void run_job(JCR *jcr) jcr->jr.JobStatus = jcr->JobStatus; if (!db_create_job_record(jcr, jcr->db, &jcr->jr)) { Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db)); - set_jcr_job_status(jcr, JS_ErrorTerminated); - free_jcr(jcr); - return; + goto bail_out; } jcr->JobId = jcr->jr.JobId; ASSERT(jcr->jr.JobId > 0); @@ -125,10 +291,87 @@ void run_job(JCR *jcr) Emsg1(M_ABORT, 0, _("Could not add job queue: ERR=%s\n"), strerror(stat)); } Dmsg0(100, "Done run_job()\n"); + + V(jcr->mutex); + return; + +bail_out: + set_jcr_job_status(jcr, JS_ErrorTerminated); + V(jcr->mutex); + return; + +} + +/* + * Cancel a job -- typically called by the UA (Console program), but may also + * be called by the job watchdog. + * + * Returns: 1 if cancel appears to be successful + * 0 on failure. Message sent to ua->jcr. + */ +int cancel_job(UAContext *ua, JCR *jcr) +{ + BSOCK *sd, *fd; + + switch (jcr->JobStatus) { + case JS_Created: + case JS_WaitJobRes: + case JS_WaitClientRes: + case JS_WaitStoreRes: + case JS_WaitPriority: + case JS_WaitMaxJobs: + case JS_WaitStartTime: + set_jcr_job_status(jcr, JS_Canceled); + bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"), + jcr->JobId, jcr->Job); + jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */ + return 1; + + default: + set_jcr_job_status(jcr, JS_Canceled); + + /* Cancel File daemon */ + if (jcr->file_bsock) { + ua->jcr->client = jcr->client; + if (!connect_to_file_daemon(ua->jcr, 10, FDConnectTimeout, 1)) { + bsendmsg(ua, _("Failed to connect to File daemon.\n")); + return 0; + } + Dmsg0(200, "Connected to file daemon\n"); + fd = ua->jcr->file_bsock; + bnet_fsend(fd, "cancel Job=%s\n", jcr->Job); + while (bnet_recv(fd) >= 0) { + bsendmsg(ua, "%s", fd->msg); + } + bnet_sig(fd, BNET_TERMINATE); + bnet_close(fd); + ua->jcr->file_bsock = NULL; + } + + /* Cancel Storage daemon */ + if (jcr->store_bsock) { + ua->jcr->store = jcr->store; + if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) { + bsendmsg(ua, _("Failed to connect to Storage daemon.\n")); + return 0; + } + Dmsg0(200, "Connected to storage daemon\n"); + sd = ua->jcr->store_bsock; + bnet_fsend(sd, "cancel Job=%s\n", jcr->Job); + while (bnet_recv(sd) >= 0) { + bsendmsg(ua, "%s", sd->msg); + } + bnet_sig(sd, BNET_TERMINATE); + bnet_close(sd); + ua->jcr->store_bsock = NULL; + } + } + + return 1; } /* - * This is the engine called by job_add() when we were pulled + * This is the engine called by jobq.c:jobq_add() when we were pulled * from the work queue. * At this point, we are running in our own thread and all * necessary resources are allocated -- see jobq.c diff --git a/bacula/src/dird/jobq.c b/bacula/src/dird/jobq.c index eb4640efcd..b5026918aa 100755 --- a/bacula/src/dird/jobq.c +++ b/bacula/src/dird/jobq.c @@ -171,7 +171,10 @@ static void *sched_wait(void *arg) } wtime = jcr->sched_time - time(NULL); } + P(jcr->mutex); /* lock jcr */ jobq_add(jq, jcr); + V(jcr->mutex); + free_jcr(jcr); /* we are done with jcr */ Dmsg0(100, "Exit sched_wait\n"); return NULL; } @@ -180,6 +183,8 @@ static void *sched_wait(void *arg) /* * Add a job to the queue * jq is a queue that was created with jobq_init + * + * On entry jcr->mutex must be locked. * */ int jobq_add(jobq_t *jq, JCR *jcr) @@ -197,20 +202,27 @@ int jobq_add(jobq_t *jq, JCR *jcr) return EINVAL; } + jcr->use_count++; /* mark jcr in use by us */ + if (!job_canceled(jcr) && wtime > 0) { set_thread_concurrency(jq->max_workers + 2); sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt)); sched_pkt->jcr = jcr; sched_pkt->jq = jq; stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt); + if (!stat) { /* thread not created */ + jcr->use_count--; /* release jcr */ + } return stat; } if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) { + jcr->use_count--; /* release jcr */ return stat; } if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) { + jcr->use_count--; /* release jcr */ return ENOMEM; } item->jcr = jcr; @@ -248,13 +260,13 @@ int jobq_add(jobq_t *jq, JCR *jcr) } /* - * Remove a job from the job queue. Used only by cancel Console command. + * Remove a job from the job queue. Used only by cancel_job(). * jq is a queue that was created with jobq_init * work_item is an element of work * - * Note, it is "removed" by immediately calling a processing routine. - * if you want to cancel it, you need to provide some external means - * of doing so. + * Note, it is "removed" from the job queue. + * If you want to cancel it, you need to provide some external means + * of doing so (e.g. pthread_kill()). */ int jobq_remove(jobq_t *jq, JCR *jcr) { @@ -454,6 +466,7 @@ static void *jobq_server(void *arg) Dmsg0(100, "Call to run new job\n"); V(jq->mutex); run_job(njcr); /* This creates a "new" job */ + free_jcr(njcr); /* release "new" jcr */ P(jq->mutex); Dmsg0(100, "Back from running new job.\n"); } diff --git a/bacula/src/dird/protos.h b/bacula/src/dird/protos.h index f05b8c0cd0..d9a9925525 100644 --- a/bacula/src/dird/protos.h +++ b/bacula/src/dird/protos.h @@ -87,6 +87,7 @@ extern void create_unique_job_name(JCR *jcr, char *base_name); extern void update_job_end_record(JCR *jcr); extern int get_or_create_client_record(JCR *jcr); extern void run_job(JCR *jcr); +extern int cancel_job(UAContext *ua, JCR *jcr); /* mountreq.c */ extern void mount_request(JCR *jcr, BSOCK *bs, char *buf); @@ -137,6 +138,7 @@ RUN *find_next_run(RUN *run, JOB *job, time_t &runtime); /* ua_server.c */ void bsendmsg(void *sock, char *fmt, ...); UAContext *new_ua_context(JCR *jcr); +JCR *create_control_jcr(char *base_name, int job_type); void free_ua_context(UAContext *ua); /* ua_select.c */ diff --git a/bacula/src/dird/run_conf.c b/bacula/src/dird/run_conf.c index 66fafec301..faabf7c9fa 100644 --- a/bacula/src/dird/run_conf.c +++ b/bacula/src/dird/run_conf.c @@ -412,6 +412,7 @@ void store_run(LEX *lc, struct res_items *item, int index, int pass) scan_err0(lc, _("Bad time specification.")); /* NOT REACHED */ } + /****FIXME**** convert to UTC */ set_bit(code, lrun.hour); lrun.minute = code2; have_hour = true; diff --git a/bacula/src/dird/ua_cmds.c b/bacula/src/dird/ua_cmds.c index 45f62fd5ec..37ba7ab57e 100644 --- a/bacula/src/dird/ua_cmds.c +++ b/bacula/src/dird/ua_cmds.c @@ -348,9 +348,8 @@ int automount_cmd(UAContext *ua, char *cmd) */ static int cancel_cmd(UAContext *ua, char *cmd) { - int i; + int i, ret; int njobs = 0; - BSOCK *sd, *fd; JCR *jcr = NULL; char JobName[MAX_NAME_LENGTH]; @@ -419,72 +418,18 @@ static int cancel_cmd(UAContext *ua, char *cmd) return 1; } } + /* NOTE! This increments the ref_count */ jcr = get_jcr_by_full_name(JobName); if (!jcr) { bsendmsg(ua, _("Job %s not found.\n"), JobName); return 1; } } - - switch (jcr->JobStatus) { - case JS_Created: - case JS_WaitJobRes: - case JS_WaitClientRes: - case JS_WaitStoreRes: - case JS_WaitPriority: - case JS_WaitMaxJobs: - case JS_WaitStartTime: - set_jcr_job_status(jcr, JS_Canceled); - bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"), - jcr->JobId, jcr->Job); - jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */ - free_jcr(jcr); /* this decrements the use count only */ - return 1; - - default: - set_jcr_job_status(jcr, JS_Canceled); - - /* Cancel File daemon */ - if (jcr->file_bsock) { - ua->jcr->client = jcr->client; - if (!connect_to_file_daemon(ua->jcr, 10, FDConnectTimeout, 1)) { - bsendmsg(ua, _("Failed to connect to File daemon.\n")); - free_jcr(jcr); - return 1; - } - Dmsg0(200, "Connected to file daemon\n"); - fd = ua->jcr->file_bsock; - bnet_fsend(fd, "cancel Job=%s\n", jcr->Job); - while (bnet_recv(fd) >= 0) { - bsendmsg(ua, "%s", fd->msg); - } - bnet_sig(fd, BNET_TERMINATE); - bnet_close(fd); - ua->jcr->file_bsock = NULL; - } - /* Cancel Storage daemon */ - if (jcr->store_bsock) { - ua->jcr->store = jcr->store; - if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) { - bsendmsg(ua, _("Failed to connect to Storage daemon.\n")); - free_jcr(jcr); - return 1; - } - Dmsg0(200, "Connected to storage daemon\n"); - sd = ua->jcr->store_bsock; - bnet_fsend(sd, "cancel Job=%s\n", jcr->Job); - while (bnet_recv(sd) >= 0) { - bsendmsg(ua, "%s", sd->msg); - } - bnet_sig(sd, BNET_TERMINATE); - bnet_close(sd); - ua->jcr->store_bsock = NULL; - } - } + ret = cancel_job(ua, jcr); free_jcr(jcr); - return 1; + return ret; } /* diff --git a/bacula/src/dird/ua_run.c b/bacula/src/dird/ua_run.c index e18777a7e2..d8c96b821e 100644 --- a/bacula/src/dird/ua_run.c +++ b/bacula/src/dird/ua_run.c @@ -325,7 +325,10 @@ int run_cmd(UAContext *ua, char *cmd) verify_job = job->verify_job; } - /* Create JCR to run job */ + /* + * Create JCR to run job. NOTE!!! after this point, free_jcr() + * before returning. + */ jcr = new_jcr(sizeof(JCR), dird_free_jcr); set_jcr_defaults(jcr, job); @@ -364,7 +367,7 @@ int run_cmd(UAContext *ua, char *cmd) } if (!jcr->replace) { bsendmsg(ua, _("Invalid replace option: %s\n"), replace); - return 0; + goto bail_out; } } else if (job->replace) { jcr->replace = job->replace; @@ -387,7 +390,7 @@ try_again: switch (jcr->JobType) { char ec1[30]; char dt[MAX_TIME_LENGTH]; - case JT_ADMIN: + case JT_ADMIN: bsendmsg(ua, _("Run %s job\n\ JobName: %s\n\ FileSet: %s\n\ @@ -402,29 +405,28 @@ Priority: %d\n"), NPRT(jcr->store->hdr.name), bstrutime(dt, sizeof(dt), jcr->sched_time), jcr->JobPriority); - jcr->JobLevel = L_FULL; - break; - case JT_BACKUP: - case JT_VERIFY: - if (level_name) { - /* Look up level name and pull code */ - found = 0; - for (i=0; joblevels[i].level_name; i++) { - if (strcasecmp(level_name, _(joblevels[i].level_name)) == 0) { - jcr->JobLevel = joblevels[i].level; - found = 1; - break; - } - } - if (!found) { - bsendmsg(ua, _("Level %s not valid.\n"), level_name); - free_jcr(jcr); - return 1; + jcr->JobLevel = L_FULL; + break; + case JT_BACKUP: + case JT_VERIFY: + if (level_name) { + /* Look up level name and pull code */ + found = 0; + for (i=0; joblevels[i].level_name; i++) { + if (strcasecmp(level_name, _(joblevels[i].level_name)) == 0) { + jcr->JobLevel = joblevels[i].level; + found = 1; + break; } } - level_name = NULL; - if (jcr->JobType == JT_BACKUP) { - bsendmsg(ua, _("Run %s job\n\ + if (!found) { + bsendmsg(ua, _("Level %s not valid.\n"), level_name); + goto bail_out; + } + } + level_name = NULL; + if (jcr->JobType == JT_BACKUP) { + bsendmsg(ua, _("Run %s job\n\ JobName: %s\n\ FileSet: %s\n\ Level: %s\n\ @@ -442,14 +444,14 @@ Priority: %d\n"), NPRT(jcr->pool->hdr.name), bstrutime(dt, sizeof(dt), jcr->sched_time), jcr->JobPriority); - } else { /* JT_VERIFY */ - char *Name; - if (jcr->job->verify_job) { - Name = jcr->job->verify_job->hdr.name; - } else { - Name = ""; - } - bsendmsg(ua, _("Run %s job\n\ + } else { /* JT_VERIFY */ + char *Name; + if (jcr->job->verify_job) { + Name = jcr->job->verify_job->hdr.name; + } else { + Name = ""; + } + bsendmsg(ua, _("Run %s job\n\ JobName: %s\n\ FileSet: %s\n\ Level: %s\n\ @@ -459,34 +461,33 @@ Pool: %s\n\ Verify Job: %s\n\ When: %s\n\ Priority: %d\n"), - _("Verify"), - job->hdr.name, - jcr->fileset->hdr.name, - level_to_str(jcr->JobLevel), - jcr->client->hdr.name, - jcr->store->hdr.name, - NPRT(jcr->pool->hdr.name), - Name, - bstrutime(dt, sizeof(dt), jcr->sched_time), - jcr->JobPriority); - } - break; - case JT_RESTORE: - if (jcr->RestoreJobId == 0 && !jcr->RestoreBootstrap) { - if (jid) { - jcr->RestoreJobId = atoi(jid); - } else { - if (!get_pint(ua, _("Please enter a JobId for restore: "))) { - free_jcr(jcr); - return 1; - } - jcr->RestoreJobId = ua->pint32_val; - } + _("Verify"), + job->hdr.name, + jcr->fileset->hdr.name, + level_to_str(jcr->JobLevel), + jcr->client->hdr.name, + jcr->store->hdr.name, + NPRT(jcr->pool->hdr.name), + Name, + bstrutime(dt, sizeof(dt), jcr->sched_time), + jcr->JobPriority); + } + break; + case JT_RESTORE: + if (jcr->RestoreJobId == 0 && !jcr->RestoreBootstrap) { + if (jid) { + jcr->RestoreJobId = atoi(jid); + } else { + if (!get_pint(ua, _("Please enter a JobId for restore: "))) { + goto bail_out; + } + jcr->RestoreJobId = ua->pint32_val; } - jcr->JobLevel = L_FULL; /* default level */ - Dmsg1(20, "JobId to restore=%d\n", jcr->RestoreJobId); - if (jcr->RestoreJobId == 0) { - bsendmsg(ua, _("Run Restore job\n\ + } + jcr->JobLevel = L_FULL; /* default level */ + Dmsg1(20, "JobId to restore=%d\n", jcr->RestoreJobId); + if (jcr->RestoreJobId == 0) { + bsendmsg(ua, _("Run Restore job\n\ JobName: %s\n\ Bootstrap: %s\n\ Where: %s\n\ @@ -496,17 +497,17 @@ Client: %s\n\ Storage: %s\n\ When: %s\n\ Priority: %d\n"), - job->hdr.name, - NPRT(jcr->RestoreBootstrap), - jcr->where?jcr->where:NPRT(job->RestoreWhere), - replace, - jcr->fileset->hdr.name, - jcr->client->hdr.name, - jcr->store->hdr.name, - bstrutime(dt, sizeof(dt), jcr->sched_time), - jcr->JobPriority); - } else { - bsendmsg(ua, _("Run Restore job\n\ + job->hdr.name, + NPRT(jcr->RestoreBootstrap), + jcr->where?jcr->where:NPRT(job->RestoreWhere), + replace, + jcr->fileset->hdr.name, + jcr->client->hdr.name, + jcr->store->hdr.name, + bstrutime(dt, sizeof(dt), jcr->sched_time), + jcr->JobPriority); + } else { + bsendmsg(ua, _("Run Restore job\n\ JobName: %s\n\ Bootstrap: %s\n\ Where: %s\n\ @@ -517,43 +518,40 @@ Storage: %s\n\ JobId: %s\n\ When: %s\n\ Priority: %d\n"), - job->hdr.name, - NPRT(jcr->RestoreBootstrap), - jcr->where?jcr->where:NPRT(job->RestoreWhere), - replace, - jcr->fileset->hdr.name, - jcr->client->hdr.name, - jcr->store->hdr.name, - jcr->RestoreJobId==0?"*None*":edit_uint64(jcr->RestoreJobId, ec1), - bstrutime(dt, sizeof(dt), jcr->sched_time), - jcr->JobPriority); - } - break; - default: - bsendmsg(ua, _("Unknown Job Type=%d\n"), jcr->JobType); - free_jcr(jcr); - return 0; + job->hdr.name, + NPRT(jcr->RestoreBootstrap), + jcr->where?jcr->where:NPRT(job->RestoreWhere), + replace, + jcr->fileset->hdr.name, + jcr->client->hdr.name, + jcr->store->hdr.name, + jcr->RestoreJobId==0?"*None*":edit_uint64(jcr->RestoreJobId, ec1), + bstrutime(dt, sizeof(dt), jcr->sched_time), + jcr->JobPriority); + } + break; + default: + bsendmsg(ua, _("Unknown Job Type=%d\n"), jcr->JobType); + goto bail_out; } /* Run without prompting? */ if (find_arg(ua, _("yes")) > 0) { Dmsg1(200, "Calling run_job job=%x\n", jcr->job); run_job(jcr); + free_jcr(jcr); /* release jcr */ bsendmsg(ua, _("Run command submitted.\n")); return 1; } if (!get_cmd(ua, _("OK to run? (yes/mod/no): "))) { - free_jcr(jcr); - return 0; /* do not run */ + goto bail_out; } /* * At user request modify parameters of job to be run. */ if (ua->cmd[0] == 0) { - bsendmsg(ua, _("Job not run.\n")); - free_jcr(jcr); - return 0; /* do not run */ + goto bail_out; } if (strncasecmp(ua->cmd, _("mod"), strlen(ua->cmd)) == 0) { FILE *fd; @@ -777,20 +775,19 @@ Priority: %d\n"), default: goto try_again; } - bsendmsg(ua, _("Job not run.\n")); - free_jcr(jcr); - return 0; /* error do no run Job */ + goto bail_out; } if (strncasecmp(ua->cmd, _("yes"), strlen(ua->cmd)) == 0) { Dmsg1(200, "Calling run_job job=%x\n", jcr->job); run_job(jcr); + free_jcr(jcr); /* release jcr */ bsendmsg(ua, _("Run command submitted.\n")); return 1; } +bail_out: bsendmsg(ua, _("Job not run.\n")); free_jcr(jcr); return 0; /* do not run */ - } diff --git a/bacula/src/dird/ua_server.c b/bacula/src/dird/ua_server.c index 75c08fc9d6..cd6a94ab70 100644 --- a/bacula/src/dird/ua_server.c +++ b/bacula/src/dird/ua_server.c @@ -92,20 +92,20 @@ static void *connect_thread(void *arg) } /* - * Create a Job Control Record for a console "job" + * Create a Job Control Record for a control "job", * filling in all the appropriate fields. */ -static JCR *create_console_jcr() +JCR *create_control_jcr(char *base_name, int job_type) { JCR *jcr; jcr = new_jcr(sizeof(JCR), dird_free_jcr); jcr->sd_auth_key = bstrdup("dummy"); /* dummy Storage daemon key */ - create_unique_job_name(jcr, "*Console*"); + create_unique_job_name(jcr, base_name); jcr->sched_time = jcr->start_time; - jcr->JobType = JT_CONSOLE; + jcr->JobType = job_type; jcr->JobLevel = L_FULL; jcr->JobStatus = JS_Running; - /* None of these are really defined for the Console, so we + /* None of these are really defined for control JCRs, so we * simply take the first of each one. This ensures that there * will be no null pointer references. */ @@ -134,7 +134,7 @@ static void *handle_UA_client_request(void *arg) pthread_detach(pthread_self()); - jcr = create_console_jcr(); + jcr = create_control_jcr("*Console*", JT_CONSOLE); ua = new_ua_context(jcr); ua->UA_sock = UA_sock; diff --git a/bacula/src/dird/ua_status.c b/bacula/src/dird/ua_status.c index 819a2302b3..b71b8161a1 100644 --- a/bacula/src/dird/ua_status.c +++ b/bacula/src/dird/ua_status.c @@ -386,9 +386,14 @@ static void list_running_jobs(UAContext *ua) lock_jcr_chain(); for (jcr=NULL; (jcr=get_next_jcr(jcr)); njobs++) { if (jcr->JobId == 0) { /* this is us */ - bstrftime(dt, sizeof(dt), jcr->start_time); - strcpy(dt+7, dt+9); /* cut century */ - bsendmsg(ua, _("Console connected at %s\n"), dt); + /* this is a console or other control job. We only show console + * jobs in the status output. + */ + if (jcr->JobType == JT_CONSOLE) { + bstrftime(dt, sizeof(dt), jcr->start_time); + strcpy(dt+7, dt+9); /* cut century */ + bsendmsg(ua, _("Console connected at %s\n"), dt); + } njobs--; } free_locked_jcr(jcr); diff --git a/bacula/src/filed/filed.c b/bacula/src/filed/filed.c index aa8dbe07be..61c6ea2f3d 100644 --- a/bacula/src/filed/filed.c +++ b/bacula/src/filed/filed.c @@ -224,6 +224,8 @@ Without that I don't know who I am :-(\n"), configfile); start_watchdog(); /* start watchdog thread */ + init_jcr_subsystem(); /* start JCR watchdogs etc. */ + if (inetd_request) { /* Socket is on fd 0 */ BSOCK *bs = init_bsock(NULL, 0, "client", "unknown client", me->FDport); @@ -241,8 +243,6 @@ Without that I don't know who I am :-(\n"), configfile); void terminate_filed(int sig) { - stop_watchdog(); - if (configfile != NULL) { free(configfile); } @@ -252,6 +252,7 @@ void terminate_filed(int sig) delete_pid_file(me->pid_directory, "bacula-fd", me->FDport); free_config_resources(); term_msg(); + stop_watchdog(); close_memory_pool(); /* release free memory in pool */ sm_dump(False); /* dump orphaned buffers */ exit(1); diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index 8a47e2e6c3..6f8994fe51 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -51,6 +51,7 @@ #define JT_VERIFY 'V' /* Verify Job */ #define JT_RESTORE 'R' /* Restore Job */ #define JT_CONSOLE 'C' /* console program */ +#define JT_SYSTEM 'S' /* internal system "job" */ #define JT_ADMIN 'D' /* admin job */ #define JT_ARCHIVE 'A' @@ -273,6 +274,7 @@ extern dlist *last_jobs; /* The following routines are found in lib/jcr.c */ +extern bool init_jcr_subsystem(void); extern JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr); extern void free_locked_jcr(JCR *jcr); extern JCR *get_jcr_by_id(uint32_t JobId); diff --git a/bacula/src/lib/Makefile.in b/bacula/src/lib/Makefile.in index dcdb32ab33..28cab1eed2 100644 --- a/bacula/src/lib/Makefile.in +++ b/bacula/src/lib/Makefile.in @@ -39,7 +39,7 @@ LIBSRCS = alloc.c attr.c base64.c bsys.c bget_msg.c \ md5.c message.c mem_pool.c parse_conf.c \ queue.c rwlock.c scan.c serial.c sha1.c \ semlock.c signal.c smartall.c tree.c \ - util.c var.c watchdog.c workq.c + util.c var.c watchdog.c workq.c timers.c LIBOBJS = alloc.o attr.o base64.o bsys.o bget_msg.o \ @@ -50,7 +50,7 @@ LIBOBJS = alloc.o attr.o base64.o bsys.o bget_msg.o \ md5.o message.o mem_pool.o parse_conf.o \ queue.o rwlock.o scan.o serial.o sha1.o \ semlock.o signal.o smartall.o tree.o \ - util.o var.o watchdog.o workq.o + util.o var.o watchdog.o workq.o timers.o EXTRAOBJS = @OBJLIST@ diff --git a/bacula/src/lib/jcr.c b/bacula/src/lib/jcr.c index 6936b63854..017c0f37ba 100755 --- a/bacula/src/lib/jcr.c +++ b/bacula/src/lib/jcr.c @@ -31,7 +31,12 @@ #include "bacula.h" #include "jcr.h" -extern void timeout_handler(int sig); +/* External variables we reference */ +extern time_t watchdog_time; + +/* Forward referenced functions */ +static void timeout_handler(int sig); +static void jcr_timeout_check(watchdog_t *self); struct s_last_job last_job; /* last job run by this daemon */ dlist *last_jobs; @@ -40,7 +45,7 @@ dlist *last_jobs; static JCR *jobs = NULL; /* pointer to JCR chain */ /* Mutex for locking various jcr chains while updating */ -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t jcr_chain_mutex = PTHREAD_MUTEX_INITIALIZER; void init_last_jobs_list() { @@ -59,12 +64,14 @@ void term_last_jobs_list() void lock_last_jobs_list() { - P(mutex); + /* Use jcr chain mutex */ + P(jcr_chain_mutex); } void unlock_last_jobs_list() { - V(mutex); + /* Use jcr chain mutex */ + V(jcr_chain_mutex); } /* @@ -98,14 +105,14 @@ JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr) sigfillset(&sigtimer.sa_mask); sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL); - P(mutex); + P(jcr_chain_mutex); jcr->prev = NULL; jcr->next = jobs; if (jobs) { jobs->prev = jcr; } jobs = jcr; - V(mutex); + V(jcr_chain_mutex); return jcr; } @@ -219,11 +226,11 @@ void free_jcr(JCR *jcr) #endif struct s_last_job *je; - P(mutex); + P(jcr_chain_mutex); jcr->use_count--; /* decrement use count */ Dmsg3(200, "Dec jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count, jcr->JobId); if (jcr->use_count > 0) { /* if in use */ - V(mutex); + V(jcr_chain_mutex); Dmsg2(200, "jcr 0x%x use_count=%d\n", jcr, jcr->use_count); return; } @@ -247,7 +254,7 @@ void free_jcr(JCR *jcr) last_job.JobId = 0; /* zap last job */ } close_msg(NULL); /* flush any daemon messages */ - V(mutex); + V(jcr_chain_mutex); Dmsg0(200, "Exit free_jcr\n"); } @@ -280,15 +287,17 @@ JCR *get_jcr_by_id(uint32_t JobId) { JCR *jcr; - P(mutex); + P(jcr_chain_mutex); /* lock chain */ for (jcr = jobs; jcr; jcr=jcr->next) { if (jcr->JobId == JobId) { + P(jcr->mutex); jcr->use_count++; + V(jcr->mutex); Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count); break; } } - V(mutex); + V(jcr_chain_mutex); return jcr; } @@ -301,16 +310,18 @@ JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime) { JCR *jcr; - P(mutex); + P(jcr_chain_mutex); for (jcr = jobs; jcr; jcr=jcr->next) { if (jcr->VolSessionId == SessionId && jcr->VolSessionTime == SessionTime) { + P(jcr->mutex); jcr->use_count++; + V(jcr->mutex); Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count); break; } } - V(mutex); + V(jcr_chain_mutex); return jcr; } @@ -330,16 +341,18 @@ JCR *get_jcr_by_partial_name(char *Job) if (!Job) { return NULL; } - P(mutex); + P(jcr_chain_mutex); len = strlen(Job); for (jcr = jobs; jcr; jcr=jcr->next) { if (strncmp(Job, jcr->Job, len) == 0) { + P(jcr->mutex); jcr->use_count++; + V(jcr->mutex); Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count); break; } } - V(mutex); + V(jcr_chain_mutex); return jcr; } @@ -357,15 +370,17 @@ JCR *get_jcr_by_full_name(char *Job) if (!Job) { return NULL; } - P(mutex); + P(jcr_chain_mutex); for (jcr = jobs; jcr; jcr=jcr->next) { if (strcmp(jcr->Job, Job) == 0) { + P(jcr->mutex); jcr->use_count++; + V(jcr->mutex); Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count); break; } } - V(mutex); + V(jcr_chain_mutex); return jcr; } @@ -392,7 +407,7 @@ void set_jcr_job_status(JCR *jcr, int JobStatus) */ void lock_jcr_chain() { - P(mutex); + P(jcr_chain_mutex); } /* @@ -400,7 +415,7 @@ void lock_jcr_chain() */ void unlock_jcr_chain() { - V(mutex); + V(jcr_chain_mutex); } @@ -414,8 +429,92 @@ JCR *get_next_jcr(JCR *jcr) rjcr = jcr->next; } if (rjcr) { + P(rjcr->mutex); rjcr->use_count++; + V(rjcr->mutex); Dmsg1(200, "Inc jcr use_count=%d\n", rjcr->use_count); } return rjcr; } + +bool init_jcr_subsystem(void) +{ + watchdog_t *wd = watchdog_new(); + + wd->one_shot = false; + wd->interval = 30; /* FIXME: should be configurable somewhere, even + if only with a #define */ + wd->callback = jcr_timeout_check; + + register_watchdog(wd); + + return true; +} + +static void jcr_timeout_check(watchdog_t *self) +{ + JCR *jcr; + BSOCK *fd; + time_t timer_start, now; + + Dmsg0(200, "Start JCR timeout checks\n"); + + /* Walk through all JCRs checking if any one is + * blocked for more than specified max time. + */ + lock_jcr_chain(); + for (jcr=NULL; (jcr=get_next_jcr(jcr)); ) { + free_locked_jcr(jcr); /* OK to free now cuz chain is locked */ + if (jcr->JobId == 0) { + continue; + } + fd = jcr->store_bsock; + if (fd) { + timer_start = fd->timer_start; + if (timer_start && (watchdog_time - timer_start) > fd->timeout) { + fd->timer_start = 0; /* turn off timer */ + fd->timed_out = TRUE; + Jmsg(jcr, M_ERROR, 0, _( +"Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"), + watchdog_time - timer_start); + pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); + } + } + fd = jcr->file_bsock; + if (fd) { + timer_start = fd->timer_start; + if (timer_start && (watchdog_time - timer_start) > fd->timeout) { + fd->timer_start = 0; /* turn off timer */ + fd->timed_out = TRUE; + Jmsg(jcr, M_ERROR, 0, _( +"Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"), + watchdog_time - timer_start); + pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); + } + } + fd = jcr->dir_bsock; + if (fd) { + timer_start = fd->timer_start; + if (timer_start && (watchdog_time - timer_start) > fd->timeout) { + fd->timer_start = 0; /* turn off timer */ + fd->timed_out = TRUE; + Jmsg(jcr, M_ERROR, 0, _( +"Watchdog sending kill after %d secs to thread stalled reading Director.\n"), + watchdog_time - timer_start); + pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); + } + } + + } + unlock_jcr_chain(); + + Dmsg0(200, "Finished JCR timeout checks\n"); +} + +/* + * Timeout signal comes here + */ +void timeout_handler(int sig) +{ + return; /* thus interrupting the function */ +} diff --git a/bacula/src/lib/lib.h b/bacula/src/lib/lib.h index dd6f07c265..00934170bf 100644 --- a/bacula/src/lib/lib.h +++ b/bacula/src/lib/lib.h @@ -51,6 +51,7 @@ #include "sha1.h" #include "tree.h" #include "watchdog.h" +#include "timers.h" #include "bpipe.h" #include "attr.h" #include "var.h" diff --git a/bacula/src/lib/protos.h b/bacula/src/lib/protos.h index c04472e5b1..b9c0296677 100644 --- a/bacula/src/lib/protos.h +++ b/bacula/src/lib/protos.h @@ -199,7 +199,13 @@ void set_working_directory(char *wd); /* watchdog.c */ int start_watchdog(void); int stop_watchdog(void); -btimer_t *start_child_timer(pid_t pid, uint32_t wait); +watchdog_t *watchdog_new(void); +bool register_watchdog(watchdog_t *wd); +bool unregister_watchdog(watchdog_t *wd); +bool unregister_watchdog_unlocked(watchdog_t *wd); + +/* timers.c */ +btimer_id start_child_timer(pid_t pid, uint32_t wait); void stop_child_timer(btimer_id wid); btimer_id start_thread_timer(pthread_t tid, uint32_t wait); void stop_thread_timer(btimer_id wid); diff --git a/bacula/src/lib/watchdog.c b/bacula/src/lib/watchdog.c index 14fda2273b..3d2413685c 100755 --- a/bacula/src/lib/watchdog.c +++ b/bacula/src/lib/watchdog.c @@ -30,31 +30,34 @@ #include "bacula.h" #include "jcr.h" +/* This breaks Kern's #include rules, but I don't want to put it into bacula.h + * until it has been discussed with him */ +#include "bsd_queue.h" + /* Exported globals */ time_t watchdog_time; /* this has granularity of SLEEP_TIME */ -#define SLEEP_TIME 30 /* examine things every 30 seconds */ +#define SLEEP_TIME 1 /* examine things every second */ /* Forward referenced functions */ -static void *btimer_thread(void *arg); -static void stop_btimer(btimer_id wid); -static btimer_id btimer_start_common(uint32_t wait); +static void *watchdog_thread(void *arg); /* Static globals */ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t timer = PTHREAD_COND_INITIALIZER; static int quit; -static btimer_t *timer_chain = NULL; +static bool wd_is_init = false; +/* Forward referenced callback functions */ +static void callback_child_timer(watchdog_t *self); +static void callback_thread_timer(watchdog_t *self); +static pthread_t wd_tid; -/* - * Timeout signal comes here - */ -void timeout_handler(int sig) -{ - return; /* thus interrupting the function */ -} - +/* Static globals */ +static TAILQ_HEAD(/* no struct */, s_watchdog_t) wd_queue = + TAILQ_HEAD_INITIALIZER(wd_queue); +static TAILQ_HEAD(/* no struct */, s_watchdog_t) wd_inactive = + TAILQ_HEAD_INITIALIZER(wd_inactive); /* * Start watchdog thread @@ -65,18 +68,14 @@ void timeout_handler(int sig) int start_watchdog(void) { int stat; - pthread_t wdid; - struct sigaction sigtimer; - - sigtimer.sa_flags = 0; - sigtimer.sa_handler = timeout_handler; - sigfillset(&sigtimer.sa_mask); - sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL); + + Dmsg0(200, "Initialising NicB-hacked watchdog thread\n"); watchdog_time = time(NULL); quit = FALSE; - if ((stat = pthread_create(&wdid, NULL, btimer_thread, (void *)NULL)) != 0) { + if ((stat = pthread_create(&wd_tid, NULL, watchdog_thread, NULL)) != 0) { return stat; } + wd_is_init = true; return 0; } @@ -89,223 +88,161 @@ int start_watchdog(void) int stop_watchdog(void) { int stat; + watchdog_t *p, *n; - quit = TRUE; - P(mutex); - if ((stat = pthread_cond_signal(&timer)) != 0) { - V(mutex); - return stat; + if (!wd_is_init) { + Emsg0(M_ABORT, 0, "BUG! stop_watchdog called before start_watchdog\n"); } - V(mutex); - return 0; -} - - -/* - * This is the actual watchdog thread. - */ -static void *btimer_thread(void *arg) -{ - JCR *jcr; - BSOCK *fd; - btimer_t *wid; - - Dmsg0(200, "Start watchdog thread\n"); - pthread_detach(pthread_self()); - for ( ;!quit; ) { - time_t timer_start, now; + Dmsg0(200, "Sending stop signal to NicB-hacked watchdog thread\n"); + P(mutex); + quit = true; + stat = pthread_cond_signal(&timer); + V(mutex); - Dmsg0(200, "Top of watchdog loop\n"); + wd_is_init = false; - watchdog_time = time(NULL); /* update timer */ - - /* Walk through all JCRs checking if any one is - * blocked for more than specified max time. - */ - lock_jcr_chain(); - for (jcr=NULL; (jcr=get_next_jcr(jcr)); ) { - free_locked_jcr(jcr); - if (jcr->JobId == 0) { - continue; - } - fd = jcr->store_bsock; - if (fd) { - timer_start = fd->timer_start; - if (timer_start && (watchdog_time - timer_start) > fd->timeout) { - fd->timer_start = 0; /* turn off timer */ - fd->timed_out = TRUE; - Jmsg(jcr, M_ERROR, 0, _( -"Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"), - watchdog_time - timer_start); - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); - } - } - fd = jcr->file_bsock; - if (fd) { - timer_start = fd->timer_start; - if (timer_start && (watchdog_time - timer_start) > fd->timeout) { - fd->timer_start = 0; /* turn off timer */ - fd->timed_out = TRUE; - Jmsg(jcr, M_ERROR, 0, _( -"Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"), - watchdog_time - timer_start); - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); - } - } - fd = jcr->dir_bsock; - if (fd) { - timer_start = fd->timer_start; - if (timer_start && (watchdog_time - timer_start) > fd->timeout) { - fd->timer_start = 0; /* turn off timer */ - fd->timed_out = TRUE; - Jmsg(jcr, M_ERROR, 0, _( -"Watchdog sending kill after %d secs to thread stalled reading Director.\n"), - watchdog_time - timer_start); - pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL); - } - } + stat = pthread_join(wd_tid, NULL); + TAILQ_FOREACH_SAFE(p, &wd_queue, qe, n) { + TAILQ_REMOVE(&wd_queue, p, qe); + if (p->destructor != NULL) { + p->destructor(p); } - unlock_jcr_chain(); - - Dmsg0(200, "Watchdog sleep.\n"); - bmicrosleep(SLEEP_TIME, 0); - now = time(NULL); + free(p); + } - /* - * Now handle child and thread timers set by the code. - */ - /* Walk child chain killing off any process overdue */ - P(mutex); - for (wid = timer_chain; wid; wid=wid->next) { - int killed = FALSE; - /* First ask him politely to go away */ - if (!wid->killed && now > (wid->start_time + wid->wait)) { -// Dmsg1(000, "Watchdog sigterm pid=%d\n", wid->pid); - if (wid->type == TYPE_CHILD) { - kill(wid->pid, SIGTERM); - killed = TRUE; - } else { - Dmsg1(200, "watchdog kill thread %d\n", wid->tid); - pthread_kill(wid->tid, TIMEOUT_SIGNAL); - wid->killed = TRUE; - } - } - /* If we asked a child to die, wait 3 seconds and slam him */ - if (killed) { - btimer_t *wid1; - bmicrosleep(3, 0); - for (wid1 = timer_chain; wid1; wid1=wid1->next) { - if (wid->type == TYPE_CHILD && - !wid1->killed && now > (wid1->start_time + wid1->wait)) { - kill(wid1->pid, SIGKILL); -// Dmsg1(000, "Watchdog killed pid=%d\n", wid->pid); - wid1->killed = TRUE; - } - } - } + TAILQ_FOREACH_SAFE(p, &wd_inactive, qe, n) { + TAILQ_REMOVE(&wd_inactive, p, qe); + if (p->destructor != NULL) { + p->destructor(p); } - V(mutex); - } /* end of big for loop */ + free(p); + } - Dmsg0(200, "End watchdog\n"); - return NULL; + return stat; } -/* - * Start a timer on a child process of pid, kill it after wait seconds. - * NOTE! Granularity is SLEEP_TIME (i.e. 30 seconds) - * - * Returns: btimer_id (pointer to btimer_t struct) on success - * NULL on failure - */ -btimer_id start_child_timer(pid_t pid, uint32_t wait) +watchdog_t *watchdog_new(void) { - btimer_t *wid; - wid = btimer_start_common(wait); - wid->pid = pid; - wid->type = TYPE_CHILD; - Dmsg2(200, "Start child timer 0x%x for %d secs.\n", wid, wait); - return wid; -} + watchdog_t *wd = (watchdog_t *) malloc(sizeof(watchdog_t)); -/* - * Start a timer on a thread. kill it after wait seconds. - * NOTE! Granularity is SLEEP_TIME (i.e. 30 seconds) - * - * Returns: btimer_id (pointer to btimer_t struct) on success - * NULL on failure - */ -btimer_id start_thread_timer(pthread_t tid, uint32_t wait) -{ - btimer_t *wid; - wid = btimer_start_common(wait); - wid->tid = tid; - wid->type = TYPE_PTHREAD; - Dmsg2(200, "Start thread timer 0x%x for %d secs.\n", wid, wait); - return wid; + if (!wd_is_init) { + Emsg0(M_ABORT, 0, "BUG! watchdog_new called before start_watchdog\n"); + } + + if (wd == NULL) { + return NULL; + } + wd->one_shot = true; + wd->interval = 0; + wd->callback = NULL; + wd->destructor = NULL; + wd->data = NULL; + + return wd; } -static btimer_id btimer_start_common(uint32_t wait) +bool register_watchdog(watchdog_t *wd) { - btimer_id wid = (btimer_id)malloc(sizeof(btimer_t)); + if (!wd_is_init) { + Emsg0(M_ABORT, 0, "BUG! register_watchdog called before start_watchdog\n"); + } + if (wd->callback == NULL) { + Emsg1(M_ABORT, 0, "BUG! Watchdog %p has NULL callback\n", wd); + } + if (wd->interval == 0) { + Emsg1(M_ABORT, 0, "BUG! Watchdog %p has zero interval\n", wd); + } P(mutex); - /* Chain it into timer_chain as the first item */ - wid->prev = NULL; - wid->next = timer_chain; - if (timer_chain) { - timer_chain->prev = wid; - } - timer_chain = wid; - wid->start_time = time(NULL); - wid->wait = wait; - wid->killed = FALSE; + wd->next_fire = watchdog_time + wd->interval; + TAILQ_INSERT_TAIL(&wd_queue, wd, qe); + Dmsg3(200, "Registered watchdog %p, interval %d%s\n", + wd, wd->interval, wd->one_shot ? " one shot" : ""); V(mutex); - return wid; -} -/* - * Stop child timer - */ -void stop_child_timer(btimer_id wid) -{ - Dmsg2(200, "Stop child timer 0x%x for %d secs.\n", wid, wid->wait); - stop_btimer(wid); + return false; } -/* - * Stop thread timer - */ -void stop_thread_timer(btimer_id wid) +bool unregister_watchdog_unlocked(watchdog_t *wd) { - if (!wid) { - return; + watchdog_t *p, *n; + + if (!wd_is_init) { + Emsg0(M_ABORT, 0, "BUG! unregister_watchdog_unlocked called before start_watchdog\n"); } - Dmsg2(200, "Stop thread timer 0x%x for %d secs.\n", wid, wid->wait); - stop_btimer(wid); -} + TAILQ_FOREACH_SAFE(p, &wd_queue, qe, n) { + if (wd == p) { + TAILQ_REMOVE(&wd_queue, wd, qe); + Dmsg1(200, "Unregistered watchdog %p\n", wd); + return true; + } + } -/* - * Stop btimer - */ -static void stop_btimer(btimer_id wid) + TAILQ_FOREACH_SAFE(p, &wd_inactive, qe, n) { + if (wd == p) { + TAILQ_REMOVE(&wd_inactive, wd, qe); + Dmsg1(200, "Unregistered inactive watchdog %p\n", wd); + return true; + } + } + + Dmsg1(200, "Failed to unregister watchdog %p\n", wd); + + return false; +} + +bool unregister_watchdog(watchdog_t *wd) { - if (wid == NULL) { - Emsg0(M_ABORT, 0, _("NULL btimer_id.\n")); + bool ret; + + if (!wd_is_init) { + Emsg0(M_ABORT, 0, "BUG! unregister_watchdog called before start_watchdog\n"); } + P(mutex); - /* Remove wid from timer_chain */ - if (!wid->prev) { /* if no prev */ - timer_chain = wid->next; /* set new head */ - } else { - wid->prev->next = wid->next; /* update prev */ - } - if (wid->next) { - wid->next->prev = wid->prev; /* unlink it */ - } + ret = unregister_watchdog_unlocked(wd); V(mutex); - free(wid); + + return ret; +} + +static void *watchdog_thread(void *arg) +{ + Dmsg0(200, "NicB-reworked watchdog thread entered\n"); + + while (true) { + watchdog_t *p, *n; + + P(mutex); + if (quit) { + V(mutex); + break; + } + + watchdog_time = time(NULL); + + TAILQ_FOREACH_SAFE(p, &wd_queue, qe, n) { + if (p->next_fire < watchdog_time) { + /* Run the callback */ + p->callback(p); + + /* Reschedule (or move to inactive list if it's a one-shot timer) */ + if (p->one_shot) { + TAILQ_REMOVE(&wd_queue, p, qe); + TAILQ_INSERT_TAIL(&wd_inactive, p, qe); + } else { + p->next_fire = watchdog_time + p->interval; + } + } + } + V(mutex); + bmicrosleep(SLEEP_TIME, 0); + } + + Dmsg0(200, "NicB-reworked watchdog thread exited\n"); + + return NULL; } diff --git a/bacula/src/lib/watchdog.h b/bacula/src/lib/watchdog.h index 8d39d38bda..aaae0c74cc 100644 --- a/bacula/src/lib/watchdog.h +++ b/bacula/src/lib/watchdog.h @@ -29,15 +29,18 @@ #define TIMEOUT_SIGNAL SIGUSR2 -typedef struct s_btimer_t { - struct s_btimer_t *next; - struct s_btimer_t *prev; - time_t start_time; - int32_t wait; - pid_t pid; /* process id if TYPE_CHILD */ - int killed; - int type; - pthread_t tid; /* thread id if TYPE_PTHREAD */ -} btimer_t; - -#define btimer_id btimer_t * +/* This breaks Kern's #include rules, but I don't want to put it into bacula.h + * until it has been discussed with him */ +#include "bsd_queue.h" + +struct s_watchdog_t { + bool one_shot; + time_t interval; + void (*callback)(struct s_watchdog_t *wd); + void (*destructor)(struct s_watchdog_t *wd); + void *data; + /* Private data below - don't touch outside of watchdog.c */ + TAILQ_ENTRY(s_watchdog_t) qe; + time_t next_fire; +}; +typedef struct s_watchdog_t watchdog_t; diff --git a/bacula/src/stored/stored.c b/bacula/src/stored/stored.c index a83e70bd01..c3690464dc 100644 --- a/bacula/src/stored/stored.c +++ b/bacula/src/stored/stored.c @@ -209,6 +209,8 @@ int main (int argc, char *argv[]) start_watchdog(); /* start watchdog thread */ + init_jcr_subsystem(); /* start JCR watchdogs etc. */ + /* * Sleep a bit to give device thread a chance to lock the resource * chain before we start the server. @@ -372,7 +374,6 @@ void terminate_stored(int sig) } delete_pid_file(me->pid_directory, "bacula-sd", me->SDport); - stop_watchdog(); Dmsg1(200, "In terminate_stored() sig=%d\n", sig); @@ -392,6 +393,7 @@ void terminate_stored(int sig) print_memory_pool_stats(); } term_msg(); + stop_watchdog(); close_memory_pool(); sm_dump(False); /* dump orphaned buffers */ diff --git a/bacula/src/version.h b/bacula/src/version.h index b821f338d5..d3295c2b86 100644 --- a/bacula/src/version.h +++ b/bacula/src/version.h @@ -2,8 +2,8 @@ #undef VERSION #define VERSION "1.33" #define VSTRING "1" -#define BDATE "20 Dec 2003" -#define LSMDATE "20Dec03" +#define BDATE "22 Dec 2003" +#define LSMDATE "22Dec03" /* Debug flags */ #undef DEBUG -- 2.39.5