- Add subsections to the Disaster Recovery index section.
For 1.33
+- If a tape is recycled while it is mounted, Stanislav Tvrudy must do an
+ additional mount to deblock the job.
- Notes for integrating Nic's code:
- Most likely jcr crash is the free_jcr(). The ref count was not incremented
in the watchdog call to cancel_job() as it is in the UA -- see
start_watchdog(); /* start network watchdog thread */
+ init_jcr_subsystem(); /* start JCR watchdogs etc. */
+
init_job_server(director->MaxConcurrentJobs);
Dmsg0(200, "wait for next job\n");
/* Main loop -- call scheduler to get next job to run */
while ((jcr = wait_for_next_job(runjob))) {
run_job(jcr); /* run job */
+ free_jcr(jcr); /* release jcr */
if (runjob) { /* command line, run a single job? */
break; /* yes, terminate */
}
already_here = TRUE;
delete_pid_file(director->pid_directory, "bacula-dir",
director->DIRport);
- stop_watchdog();
// signal(SIGCHLD, SIG_IGN); /* don't worry about children now */
term_scheduler();
if (runjob) {
free_config_resources();
term_ua_server();
term_msg(); /* terminate message handler */
+ stop_watchdog();
close_memory_pool(); /* release free memory in pool */
sm_dump(False);
exit(sig != 0);
{"replace", store_replace, ITEM(res_job.replace), 0, ITEM_DEFAULT, REPLACE_ALWAYS},
{"bootstrap",store_dir, ITEM(res_job.RestoreBootstrap), 0, 0, 0},
{"maxruntime", store_time, ITEM(res_job.MaxRunTime), 0, 0, 0},
+ {"maxwaittime", store_time, ITEM(res_job.MaxWaitTime), 0, 0, 0},
{"maxstartdelay", store_time,ITEM(res_job.MaxStartDelay), 0, 0, 0},
{"prefixlinks", store_yesno, ITEM(res_job.PrefixLinks), 1, ITEM_DEFAULT, 0},
{"prunejobs", store_yesno, ITEM(res_job.PruneJobs), 1, ITEM_DEFAULT, 0},
char *WriteBootstrap; /* Where to write bootstrap Job updates */
int replace; /* How (overwrite, ..) */
utime_t MaxRunTime; /* max run time in seconds */
+ utime_t MaxWaitTime; /* max blocking time in seconds */
utime_t MaxStartDelay; /* max start delay in seconds */
int PrefixLinks; /* prefix soft links with Where path */
int PruneJobs; /* Force pruning of Jobs */
/* Forward referenced subroutines */
static void *job_thread(void *arg);
+static void job_monitor_watchdog(watchdog_t *self);
+static void job_monitor_destructor(watchdog_t *self);
+static bool job_check_maxwaittime(JCR *control_jcr, JCR *jcr);
+static bool job_check_maxruntime(JCR *control_jcr, JCR *jcr);
/* Exported subroutines */
-
/* Imported subroutines */
extern void term_scheduler();
extern void term_ua_server();
extern int do_restore(JCR *jcr);
extern int do_verify(JCR *jcr);
+/* Imported variables */
+extern time_t watchdog_time;
+
jobq_t job_queue;
void init_job_server(int max_workers)
{
int stat;
+ watchdog_t *wd;
+
if ((stat = jobq_init(&job_queue, max_workers, job_thread)) != 0) {
Emsg1(M_ABORT, 0, _("Could not init job queue: ERR=%s\n"), strerror(stat));
}
+ if ((wd = watchdog_new()) == NULL) {
+ Emsg0(M_ABORT, 0, _("Could not init job monitor watchdogs\n"));
+ }
+ wd->callback = job_monitor_watchdog;
+ wd->destructor = job_monitor_destructor;
+ wd->one_shot = false;
+ wd->interval = 60;
+ wd->data = create_control_jcr("*JobMonitor*", JT_SYSTEM);
+ register_watchdog(wd);
+
return;
}
+static void job_monitor_destructor(watchdog_t *self)
+{
+ JCR *control_jcr = (JCR *) self->data;
+
+ free_jcr(control_jcr);
+}
+
+static void job_monitor_watchdog(watchdog_t *self)
+{
+ JCR *control_jcr, *jcr;
+
+ control_jcr = (JCR *) self->data;
+
+ Dmsg1(200, "job_monitor_watchdog %p called\n", self);
+
+ lock_jcr_chain();
+
+ for (jcr = NULL; (jcr = get_next_jcr(jcr)); /* nothing */) {
+ bool cancel;
+
+ if (jcr->JobId == 0) {
+ Dmsg2(200, "Skipping JCR %p (%s) with JobId 0\n",
+ jcr, jcr->Job);
+ /* Keep reference counts correct */
+ free_locked_jcr(jcr);
+ continue;
+ }
+
+ /* check MaxWaitTime */
+ cancel = job_check_maxwaittime(control_jcr, jcr);
+
+ /* check MaxRunTime */
+ cancel |= job_check_maxruntime(control_jcr, jcr);
+
+ if (cancel) {
+ Dmsg3(200, "Cancelling JCR %p jobid %d (%s)\n",
+ jcr, jcr->JobId, jcr->Job);
+
+ UAContext *ua = new_ua_context(jcr);
+ ua->jcr = control_jcr;
+ cancel_job(ua, jcr);
+ free_ua_context(ua);
+
+ Dmsg1(200, "Have cancelled JCR %p\n", jcr);
+ }
+
+ /* Keep reference counts correct */
+ free_locked_jcr(jcr);
+ }
+ unlock_jcr_chain();
+}
+
+static bool job_check_maxwaittime(JCR *control_jcr, JCR *jcr)
+{
+ bool cancel = false;
+
+ if (jcr->job->MaxWaitTime == 0) {
+ return false;
+ }
+ if ((watchdog_time - jcr->start_time) < jcr->job->MaxWaitTime) {
+ Dmsg3(200, "Job %p (%s) with MaxWaitTime %d not expired\n",
+ jcr, jcr->Job, jcr->job->MaxWaitTime);
+ return false;
+ }
+ Dmsg3(200, "Job %d (%s): MaxWaitTime of %d seconds exceeded, "
+ "checking status\n",
+ jcr->JobId, jcr->Job, jcr->job->MaxWaitTime);
+ switch (jcr->JobStatus) {
+ case JS_Created:
+ case JS_Blocked:
+ case JS_WaitFD:
+ case JS_WaitSD:
+ case JS_WaitStoreRes:
+ case JS_WaitClientRes:
+ case JS_WaitJobRes:
+ case JS_WaitPriority:
+ case JS_WaitMaxJobs:
+ case JS_WaitStartTime:
+ cancel = true;
+ Dmsg0(200, "JCR blocked in #1\n");
+ break;
+ case JS_Running:
+ Dmsg0(200, "JCR running, checking SD status\n");
+ switch (jcr->SDJobStatus) {
+ case JS_WaitMount:
+ case JS_WaitMedia:
+ case JS_WaitFD:
+ cancel = true;
+ Dmsg0(200, "JCR blocked in #2\n");
+ break;
+ default:
+ Dmsg0(200, "JCR not blocked in #2\n");
+ break;
+ }
+ break;
+ case JS_Terminated:
+ case JS_ErrorTerminated:
+ case JS_Canceled:
+ Dmsg0(200, "JCR already dead in #3\n");
+ break;
+ default:
+ Emsg1(M_ABORT, 0, _("Unhandled job status code %d\n"),
+ jcr->JobStatus);
+ }
+ Dmsg3(200, "MaxWaitTime result: %scancel JCR %p (%s)\n",
+ cancel ? "" : "do not ", jcr, jcr->job);
+
+ return cancel;
+}
+
+static bool job_check_maxruntime(JCR *control_jcr, JCR *jcr)
+{
+ bool cancel = false;
+
+ if (jcr->job->MaxRunTime == 0) {
+ return false;
+ }
+ if ((watchdog_time - jcr->start_time) < jcr->job->MaxRunTime) {
+ Dmsg3(200, "Job %p (%s) with MaxRunTime %d not expired\n",
+ jcr, jcr->Job, jcr->job->MaxRunTime);
+ return false;
+ }
+
+ switch (jcr->JobStatus) {
+ case JS_Created:
+ case JS_Blocked:
+ case JS_WaitFD:
+ case JS_WaitSD:
+ case JS_WaitStoreRes:
+ case JS_WaitClientRes:
+ case JS_WaitJobRes:
+ case JS_WaitPriority:
+ case JS_WaitMaxJobs:
+ case JS_WaitStartTime:
+ case JS_Running:
+ cancel = true;
+ break;
+ case JS_Terminated:
+ case JS_ErrorTerminated:
+ case JS_Canceled:
+ cancel = false;
+ break;
+ default:
+ Emsg1(M_ABORT, 0, _("Unhandled job status code %d\n"),
+ jcr->JobStatus);
+ }
+
+ Dmsg3(200, "MaxRunTime result: %scancel JCR %p (%s)\n",
+ cancel ? "" : "do not ", jcr, jcr->job);
+
+ return cancel;
+}
+
/*
* Run a job -- typically called by the scheduler, but may also
* be called by the UA (Console program).
{
int stat, errstat;
+ P(jcr->mutex);
sm_check(__FILE__, __LINE__, True);
init_msg(jcr, jcr->messages);
create_unique_job_name(jcr, jcr->job->hdr.name);
/* Initialize termination condition variable */
if ((errstat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) {
Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), strerror(errstat));
- set_jcr_job_status(jcr, JS_ErrorTerminated);
- free_jcr(jcr);
- return;
+ goto bail_out;
}
/*
if (jcr->db) {
Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
}
- set_jcr_job_status(jcr, JS_ErrorTerminated);
- free_jcr(jcr);
- return;
+ goto bail_out;
}
Dmsg0(50, "DB opened\n");
jcr->jr.JobStatus = jcr->JobStatus;
if (!db_create_job_record(jcr, jcr->db, &jcr->jr)) {
Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
- set_jcr_job_status(jcr, JS_ErrorTerminated);
- free_jcr(jcr);
- return;
+ goto bail_out;
}
jcr->JobId = jcr->jr.JobId;
ASSERT(jcr->jr.JobId > 0);
Emsg1(M_ABORT, 0, _("Could not add job queue: ERR=%s\n"), strerror(stat));
}
Dmsg0(100, "Done run_job()\n");
+
+ V(jcr->mutex);
+ return;
+
+bail_out:
+ set_jcr_job_status(jcr, JS_ErrorTerminated);
+ V(jcr->mutex);
+ return;
+
+}
+
+/*
+ * Cancel a job -- typically called by the UA (Console program), but may also
+ * be called by the job watchdog.
+ *
+ * Returns: 1 if cancel appears to be successful
+ * 0 on failure. Message sent to ua->jcr.
+ */
+int cancel_job(UAContext *ua, JCR *jcr)
+{
+ BSOCK *sd, *fd;
+
+ switch (jcr->JobStatus) {
+ case JS_Created:
+ case JS_WaitJobRes:
+ case JS_WaitClientRes:
+ case JS_WaitStoreRes:
+ case JS_WaitPriority:
+ case JS_WaitMaxJobs:
+ case JS_WaitStartTime:
+ set_jcr_job_status(jcr, JS_Canceled);
+ bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"),
+ jcr->JobId, jcr->Job);
+ jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */
+ return 1;
+
+ default:
+ set_jcr_job_status(jcr, JS_Canceled);
+
+ /* Cancel File daemon */
+ if (jcr->file_bsock) {
+ ua->jcr->client = jcr->client;
+ if (!connect_to_file_daemon(ua->jcr, 10, FDConnectTimeout, 1)) {
+ bsendmsg(ua, _("Failed to connect to File daemon.\n"));
+ return 0;
+ }
+ Dmsg0(200, "Connected to file daemon\n");
+ fd = ua->jcr->file_bsock;
+ bnet_fsend(fd, "cancel Job=%s\n", jcr->Job);
+ while (bnet_recv(fd) >= 0) {
+ bsendmsg(ua, "%s", fd->msg);
+ }
+ bnet_sig(fd, BNET_TERMINATE);
+ bnet_close(fd);
+ ua->jcr->file_bsock = NULL;
+ }
+
+ /* Cancel Storage daemon */
+ if (jcr->store_bsock) {
+ ua->jcr->store = jcr->store;
+ if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
+ bsendmsg(ua, _("Failed to connect to Storage daemon.\n"));
+ return 0;
+ }
+ Dmsg0(200, "Connected to storage daemon\n");
+ sd = ua->jcr->store_bsock;
+ bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
+ while (bnet_recv(sd) >= 0) {
+ bsendmsg(ua, "%s", sd->msg);
+ }
+ bnet_sig(sd, BNET_TERMINATE);
+ bnet_close(sd);
+ ua->jcr->store_bsock = NULL;
+ }
+ }
+
+ return 1;
}
/*
- * This is the engine called by job_add() when we were pulled
+ * This is the engine called by jobq.c:jobq_add() when we were pulled
* from the work queue.
* At this point, we are running in our own thread and all
* necessary resources are allocated -- see jobq.c
}
wtime = jcr->sched_time - time(NULL);
}
+ P(jcr->mutex); /* lock jcr */
jobq_add(jq, jcr);
+ V(jcr->mutex);
+ free_jcr(jcr); /* we are done with jcr */
Dmsg0(100, "Exit sched_wait\n");
return NULL;
}
/*
* Add a job to the queue
* jq is a queue that was created with jobq_init
+ *
+ * On entry jcr->mutex must be locked.
*
*/
int jobq_add(jobq_t *jq, JCR *jcr)
return EINVAL;
}
+ jcr->use_count++; /* mark jcr in use by us */
+
if (!job_canceled(jcr) && wtime > 0) {
set_thread_concurrency(jq->max_workers + 2);
sched_pkt = (wait_pkt *)malloc(sizeof(wait_pkt));
sched_pkt->jcr = jcr;
sched_pkt->jq = jq;
stat = pthread_create(&id, &jq->attr, sched_wait, (void *)sched_pkt);
+ if (!stat) { /* thread not created */
+ jcr->use_count--; /* release jcr */
+ }
return stat;
}
if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
+ jcr->use_count--; /* release jcr */
return stat;
}
if ((item = (jobq_item_t *)malloc(sizeof(jobq_item_t))) == NULL) {
+ jcr->use_count--; /* release jcr */
return ENOMEM;
}
item->jcr = jcr;
}
/*
- * Remove a job from the job queue. Used only by cancel Console command.
+ * Remove a job from the job queue. Used only by cancel_job().
* jq is a queue that was created with jobq_init
* work_item is an element of work
*
- * Note, it is "removed" by immediately calling a processing routine.
- * if you want to cancel it, you need to provide some external means
- * of doing so.
+ * Note, it is "removed" from the job queue.
+ * If you want to cancel it, you need to provide some external means
+ * of doing so (e.g. pthread_kill()).
*/
int jobq_remove(jobq_t *jq, JCR *jcr)
{
Dmsg0(100, "Call to run new job\n");
V(jq->mutex);
run_job(njcr); /* This creates a "new" job */
+ free_jcr(njcr); /* release "new" jcr */
P(jq->mutex);
Dmsg0(100, "Back from running new job.\n");
}
extern void update_job_end_record(JCR *jcr);
extern int get_or_create_client_record(JCR *jcr);
extern void run_job(JCR *jcr);
+extern int cancel_job(UAContext *ua, JCR *jcr);
/* mountreq.c */
extern void mount_request(JCR *jcr, BSOCK *bs, char *buf);
/* ua_server.c */
void bsendmsg(void *sock, char *fmt, ...);
UAContext *new_ua_context(JCR *jcr);
+JCR *create_control_jcr(char *base_name, int job_type);
void free_ua_context(UAContext *ua);
/* ua_select.c */
scan_err0(lc, _("Bad time specification."));
/* NOT REACHED */
}
+ /****FIXME**** convert to UTC */
set_bit(code, lrun.hour);
lrun.minute = code2;
have_hour = true;
*/
static int cancel_cmd(UAContext *ua, char *cmd)
{
- int i;
+ int i, ret;
int njobs = 0;
- BSOCK *sd, *fd;
JCR *jcr = NULL;
char JobName[MAX_NAME_LENGTH];
return 1;
}
}
+ /* NOTE! This increments the ref_count */
jcr = get_jcr_by_full_name(JobName);
if (!jcr) {
bsendmsg(ua, _("Job %s not found.\n"), JobName);
return 1;
}
}
-
- switch (jcr->JobStatus) {
- case JS_Created:
- case JS_WaitJobRes:
- case JS_WaitClientRes:
- case JS_WaitStoreRes:
- case JS_WaitPriority:
- case JS_WaitMaxJobs:
- case JS_WaitStartTime:
- set_jcr_job_status(jcr, JS_Canceled);
- bsendmsg(ua, _("JobId %d, Job %s marked to be canceled.\n"),
- jcr->JobId, jcr->Job);
- jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */
- free_jcr(jcr); /* this decrements the use count only */
- return 1;
-
- default:
- set_jcr_job_status(jcr, JS_Canceled);
-
- /* Cancel File daemon */
- if (jcr->file_bsock) {
- ua->jcr->client = jcr->client;
- if (!connect_to_file_daemon(ua->jcr, 10, FDConnectTimeout, 1)) {
- bsendmsg(ua, _("Failed to connect to File daemon.\n"));
- free_jcr(jcr);
- return 1;
- }
- Dmsg0(200, "Connected to file daemon\n");
- fd = ua->jcr->file_bsock;
- bnet_fsend(fd, "cancel Job=%s\n", jcr->Job);
- while (bnet_recv(fd) >= 0) {
- bsendmsg(ua, "%s", fd->msg);
- }
- bnet_sig(fd, BNET_TERMINATE);
- bnet_close(fd);
- ua->jcr->file_bsock = NULL;
- }
- /* Cancel Storage daemon */
- if (jcr->store_bsock) {
- ua->jcr->store = jcr->store;
- if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
- bsendmsg(ua, _("Failed to connect to Storage daemon.\n"));
- free_jcr(jcr);
- return 1;
- }
- Dmsg0(200, "Connected to storage daemon\n");
- sd = ua->jcr->store_bsock;
- bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
- while (bnet_recv(sd) >= 0) {
- bsendmsg(ua, "%s", sd->msg);
- }
- bnet_sig(sd, BNET_TERMINATE);
- bnet_close(sd);
- ua->jcr->store_bsock = NULL;
- }
- }
+ ret = cancel_job(ua, jcr);
free_jcr(jcr);
- return 1;
+ return ret;
}
/*
verify_job = job->verify_job;
}
- /* Create JCR to run job */
+ /*
+ * Create JCR to run job. NOTE!!! after this point, free_jcr()
+ * before returning.
+ */
jcr = new_jcr(sizeof(JCR), dird_free_jcr);
set_jcr_defaults(jcr, job);
}
if (!jcr->replace) {
bsendmsg(ua, _("Invalid replace option: %s\n"), replace);
- return 0;
+ goto bail_out;
}
} else if (job->replace) {
jcr->replace = job->replace;
switch (jcr->JobType) {
char ec1[30];
char dt[MAX_TIME_LENGTH];
- case JT_ADMIN:
+ case JT_ADMIN:
bsendmsg(ua, _("Run %s job\n\
JobName: %s\n\
FileSet: %s\n\
NPRT(jcr->store->hdr.name),
bstrutime(dt, sizeof(dt), jcr->sched_time),
jcr->JobPriority);
- jcr->JobLevel = L_FULL;
- break;
- case JT_BACKUP:
- case JT_VERIFY:
- if (level_name) {
- /* Look up level name and pull code */
- found = 0;
- for (i=0; joblevels[i].level_name; i++) {
- if (strcasecmp(level_name, _(joblevels[i].level_name)) == 0) {
- jcr->JobLevel = joblevels[i].level;
- found = 1;
- break;
- }
- }
- if (!found) {
- bsendmsg(ua, _("Level %s not valid.\n"), level_name);
- free_jcr(jcr);
- return 1;
+ jcr->JobLevel = L_FULL;
+ break;
+ case JT_BACKUP:
+ case JT_VERIFY:
+ if (level_name) {
+ /* Look up level name and pull code */
+ found = 0;
+ for (i=0; joblevels[i].level_name; i++) {
+ if (strcasecmp(level_name, _(joblevels[i].level_name)) == 0) {
+ jcr->JobLevel = joblevels[i].level;
+ found = 1;
+ break;
}
}
- level_name = NULL;
- if (jcr->JobType == JT_BACKUP) {
- bsendmsg(ua, _("Run %s job\n\
+ if (!found) {
+ bsendmsg(ua, _("Level %s not valid.\n"), level_name);
+ goto bail_out;
+ }
+ }
+ level_name = NULL;
+ if (jcr->JobType == JT_BACKUP) {
+ bsendmsg(ua, _("Run %s job\n\
JobName: %s\n\
FileSet: %s\n\
Level: %s\n\
NPRT(jcr->pool->hdr.name),
bstrutime(dt, sizeof(dt), jcr->sched_time),
jcr->JobPriority);
- } else { /* JT_VERIFY */
- char *Name;
- if (jcr->job->verify_job) {
- Name = jcr->job->verify_job->hdr.name;
- } else {
- Name = "";
- }
- bsendmsg(ua, _("Run %s job\n\
+ } else { /* JT_VERIFY */
+ char *Name;
+ if (jcr->job->verify_job) {
+ Name = jcr->job->verify_job->hdr.name;
+ } else {
+ Name = "";
+ }
+ bsendmsg(ua, _("Run %s job\n\
JobName: %s\n\
FileSet: %s\n\
Level: %s\n\
Verify Job: %s\n\
When: %s\n\
Priority: %d\n"),
- _("Verify"),
- job->hdr.name,
- jcr->fileset->hdr.name,
- level_to_str(jcr->JobLevel),
- jcr->client->hdr.name,
- jcr->store->hdr.name,
- NPRT(jcr->pool->hdr.name),
- Name,
- bstrutime(dt, sizeof(dt), jcr->sched_time),
- jcr->JobPriority);
- }
- break;
- case JT_RESTORE:
- if (jcr->RestoreJobId == 0 && !jcr->RestoreBootstrap) {
- if (jid) {
- jcr->RestoreJobId = atoi(jid);
- } else {
- if (!get_pint(ua, _("Please enter a JobId for restore: "))) {
- free_jcr(jcr);
- return 1;
- }
- jcr->RestoreJobId = ua->pint32_val;
- }
+ _("Verify"),
+ job->hdr.name,
+ jcr->fileset->hdr.name,
+ level_to_str(jcr->JobLevel),
+ jcr->client->hdr.name,
+ jcr->store->hdr.name,
+ NPRT(jcr->pool->hdr.name),
+ Name,
+ bstrutime(dt, sizeof(dt), jcr->sched_time),
+ jcr->JobPriority);
+ }
+ break;
+ case JT_RESTORE:
+ if (jcr->RestoreJobId == 0 && !jcr->RestoreBootstrap) {
+ if (jid) {
+ jcr->RestoreJobId = atoi(jid);
+ } else {
+ if (!get_pint(ua, _("Please enter a JobId for restore: "))) {
+ goto bail_out;
+ }
+ jcr->RestoreJobId = ua->pint32_val;
}
- jcr->JobLevel = L_FULL; /* default level */
- Dmsg1(20, "JobId to restore=%d\n", jcr->RestoreJobId);
- if (jcr->RestoreJobId == 0) {
- bsendmsg(ua, _("Run Restore job\n\
+ }
+ jcr->JobLevel = L_FULL; /* default level */
+ Dmsg1(20, "JobId to restore=%d\n", jcr->RestoreJobId);
+ if (jcr->RestoreJobId == 0) {
+ bsendmsg(ua, _("Run Restore job\n\
JobName: %s\n\
Bootstrap: %s\n\
Where: %s\n\
Storage: %s\n\
When: %s\n\
Priority: %d\n"),
- job->hdr.name,
- NPRT(jcr->RestoreBootstrap),
- jcr->where?jcr->where:NPRT(job->RestoreWhere),
- replace,
- jcr->fileset->hdr.name,
- jcr->client->hdr.name,
- jcr->store->hdr.name,
- bstrutime(dt, sizeof(dt), jcr->sched_time),
- jcr->JobPriority);
- } else {
- bsendmsg(ua, _("Run Restore job\n\
+ job->hdr.name,
+ NPRT(jcr->RestoreBootstrap),
+ jcr->where?jcr->where:NPRT(job->RestoreWhere),
+ replace,
+ jcr->fileset->hdr.name,
+ jcr->client->hdr.name,
+ jcr->store->hdr.name,
+ bstrutime(dt, sizeof(dt), jcr->sched_time),
+ jcr->JobPriority);
+ } else {
+ bsendmsg(ua, _("Run Restore job\n\
JobName: %s\n\
Bootstrap: %s\n\
Where: %s\n\
JobId: %s\n\
When: %s\n\
Priority: %d\n"),
- job->hdr.name,
- NPRT(jcr->RestoreBootstrap),
- jcr->where?jcr->where:NPRT(job->RestoreWhere),
- replace,
- jcr->fileset->hdr.name,
- jcr->client->hdr.name,
- jcr->store->hdr.name,
- jcr->RestoreJobId==0?"*None*":edit_uint64(jcr->RestoreJobId, ec1),
- bstrutime(dt, sizeof(dt), jcr->sched_time),
- jcr->JobPriority);
- }
- break;
- default:
- bsendmsg(ua, _("Unknown Job Type=%d\n"), jcr->JobType);
- free_jcr(jcr);
- return 0;
+ job->hdr.name,
+ NPRT(jcr->RestoreBootstrap),
+ jcr->where?jcr->where:NPRT(job->RestoreWhere),
+ replace,
+ jcr->fileset->hdr.name,
+ jcr->client->hdr.name,
+ jcr->store->hdr.name,
+ jcr->RestoreJobId==0?"*None*":edit_uint64(jcr->RestoreJobId, ec1),
+ bstrutime(dt, sizeof(dt), jcr->sched_time),
+ jcr->JobPriority);
+ }
+ break;
+ default:
+ bsendmsg(ua, _("Unknown Job Type=%d\n"), jcr->JobType);
+ goto bail_out;
}
/* Run without prompting? */
if (find_arg(ua, _("yes")) > 0) {
Dmsg1(200, "Calling run_job job=%x\n", jcr->job);
run_job(jcr);
+ free_jcr(jcr); /* release jcr */
bsendmsg(ua, _("Run command submitted.\n"));
return 1;
}
if (!get_cmd(ua, _("OK to run? (yes/mod/no): "))) {
- free_jcr(jcr);
- return 0; /* do not run */
+ goto bail_out;
}
/*
* At user request modify parameters of job to be run.
*/
if (ua->cmd[0] == 0) {
- bsendmsg(ua, _("Job not run.\n"));
- free_jcr(jcr);
- return 0; /* do not run */
+ goto bail_out;
}
if (strncasecmp(ua->cmd, _("mod"), strlen(ua->cmd)) == 0) {
FILE *fd;
default:
goto try_again;
}
- bsendmsg(ua, _("Job not run.\n"));
- free_jcr(jcr);
- return 0; /* error do no run Job */
+ goto bail_out;
}
if (strncasecmp(ua->cmd, _("yes"), strlen(ua->cmd)) == 0) {
Dmsg1(200, "Calling run_job job=%x\n", jcr->job);
run_job(jcr);
+ free_jcr(jcr); /* release jcr */
bsendmsg(ua, _("Run command submitted.\n"));
return 1;
}
+bail_out:
bsendmsg(ua, _("Job not run.\n"));
free_jcr(jcr);
return 0; /* do not run */
-
}
}
/*
- * Create a Job Control Record for a console "job"
+ * Create a Job Control Record for a control "job",
* filling in all the appropriate fields.
*/
-static JCR *create_console_jcr()
+JCR *create_control_jcr(char *base_name, int job_type)
{
JCR *jcr;
jcr = new_jcr(sizeof(JCR), dird_free_jcr);
jcr->sd_auth_key = bstrdup("dummy"); /* dummy Storage daemon key */
- create_unique_job_name(jcr, "*Console*");
+ create_unique_job_name(jcr, base_name);
jcr->sched_time = jcr->start_time;
- jcr->JobType = JT_CONSOLE;
+ jcr->JobType = job_type;
jcr->JobLevel = L_FULL;
jcr->JobStatus = JS_Running;
- /* None of these are really defined for the Console, so we
+ /* None of these are really defined for control JCRs, so we
* simply take the first of each one. This ensures that there
* will be no null pointer references.
*/
pthread_detach(pthread_self());
- jcr = create_console_jcr();
+ jcr = create_control_jcr("*Console*", JT_CONSOLE);
ua = new_ua_context(jcr);
ua->UA_sock = UA_sock;
lock_jcr_chain();
for (jcr=NULL; (jcr=get_next_jcr(jcr)); njobs++) {
if (jcr->JobId == 0) { /* this is us */
- bstrftime(dt, sizeof(dt), jcr->start_time);
- strcpy(dt+7, dt+9); /* cut century */
- bsendmsg(ua, _("Console connected at %s\n"), dt);
+ /* this is a console or other control job. We only show console
+ * jobs in the status output.
+ */
+ if (jcr->JobType == JT_CONSOLE) {
+ bstrftime(dt, sizeof(dt), jcr->start_time);
+ strcpy(dt+7, dt+9); /* cut century */
+ bsendmsg(ua, _("Console connected at %s\n"), dt);
+ }
njobs--;
}
free_locked_jcr(jcr);
start_watchdog(); /* start watchdog thread */
+ init_jcr_subsystem(); /* start JCR watchdogs etc. */
+
if (inetd_request) {
/* Socket is on fd 0 */
BSOCK *bs = init_bsock(NULL, 0, "client", "unknown client", me->FDport);
void terminate_filed(int sig)
{
- stop_watchdog();
-
if (configfile != NULL) {
free(configfile);
}
delete_pid_file(me->pid_directory, "bacula-fd", me->FDport);
free_config_resources();
term_msg();
+ stop_watchdog();
close_memory_pool(); /* release free memory in pool */
sm_dump(False); /* dump orphaned buffers */
exit(1);
#define JT_VERIFY 'V' /* Verify Job */
#define JT_RESTORE 'R' /* Restore Job */
#define JT_CONSOLE 'C' /* console program */
+#define JT_SYSTEM 'S' /* internal system "job" */
#define JT_ADMIN 'D' /* admin job */
#define JT_ARCHIVE 'A'
/* The following routines are found in lib/jcr.c */
+extern bool init_jcr_subsystem(void);
extern JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr);
extern void free_locked_jcr(JCR *jcr);
extern JCR *get_jcr_by_id(uint32_t JobId);
md5.c message.c mem_pool.c parse_conf.c \
queue.c rwlock.c scan.c serial.c sha1.c \
semlock.c signal.c smartall.c tree.c \
- util.c var.c watchdog.c workq.c
+ util.c var.c watchdog.c workq.c timers.c
LIBOBJS = alloc.o attr.o base64.o bsys.o bget_msg.o \
md5.o message.o mem_pool.o parse_conf.o \
queue.o rwlock.o scan.o serial.o sha1.o \
semlock.o signal.o smartall.o tree.o \
- util.o var.o watchdog.o workq.o
+ util.o var.o watchdog.o workq.o timers.o
EXTRAOBJS = @OBJLIST@
#include "bacula.h"
#include "jcr.h"
-extern void timeout_handler(int sig);
+/* External variables we reference */
+extern time_t watchdog_time;
+
+/* Forward referenced functions */
+static void timeout_handler(int sig);
+static void jcr_timeout_check(watchdog_t *self);
struct s_last_job last_job; /* last job run by this daemon */
dlist *last_jobs;
static JCR *jobs = NULL; /* pointer to JCR chain */
/* Mutex for locking various jcr chains while updating */
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t jcr_chain_mutex = PTHREAD_MUTEX_INITIALIZER;
void init_last_jobs_list()
{
void lock_last_jobs_list()
{
- P(mutex);
+ /* Use jcr chain mutex */
+ P(jcr_chain_mutex);
}
void unlock_last_jobs_list()
{
- V(mutex);
+ /* Use jcr chain mutex */
+ V(jcr_chain_mutex);
}
/*
sigfillset(&sigtimer.sa_mask);
sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
- P(mutex);
+ P(jcr_chain_mutex);
jcr->prev = NULL;
jcr->next = jobs;
if (jobs) {
jobs->prev = jcr;
}
jobs = jcr;
- V(mutex);
+ V(jcr_chain_mutex);
return jcr;
}
#endif
struct s_last_job *je;
- P(mutex);
+ P(jcr_chain_mutex);
jcr->use_count--; /* decrement use count */
Dmsg3(200, "Dec jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count, jcr->JobId);
if (jcr->use_count > 0) { /* if in use */
- V(mutex);
+ V(jcr_chain_mutex);
Dmsg2(200, "jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
return;
}
last_job.JobId = 0; /* zap last job */
}
close_msg(NULL); /* flush any daemon messages */
- V(mutex);
+ V(jcr_chain_mutex);
Dmsg0(200, "Exit free_jcr\n");
}
{
JCR *jcr;
- P(mutex);
+ P(jcr_chain_mutex); /* lock chain */
for (jcr = jobs; jcr; jcr=jcr->next) {
if (jcr->JobId == JobId) {
+ P(jcr->mutex);
jcr->use_count++;
+ V(jcr->mutex);
Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
break;
}
}
- V(mutex);
+ V(jcr_chain_mutex);
return jcr;
}
{
JCR *jcr;
- P(mutex);
+ P(jcr_chain_mutex);
for (jcr = jobs; jcr; jcr=jcr->next) {
if (jcr->VolSessionId == SessionId &&
jcr->VolSessionTime == SessionTime) {
+ P(jcr->mutex);
jcr->use_count++;
+ V(jcr->mutex);
Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
break;
}
}
- V(mutex);
+ V(jcr_chain_mutex);
return jcr;
}
if (!Job) {
return NULL;
}
- P(mutex);
+ P(jcr_chain_mutex);
len = strlen(Job);
for (jcr = jobs; jcr; jcr=jcr->next) {
if (strncmp(Job, jcr->Job, len) == 0) {
+ P(jcr->mutex);
jcr->use_count++;
+ V(jcr->mutex);
Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
break;
}
}
- V(mutex);
+ V(jcr_chain_mutex);
return jcr;
}
if (!Job) {
return NULL;
}
- P(mutex);
+ P(jcr_chain_mutex);
for (jcr = jobs; jcr; jcr=jcr->next) {
if (strcmp(jcr->Job, Job) == 0) {
+ P(jcr->mutex);
jcr->use_count++;
+ V(jcr->mutex);
Dmsg2(200, "Inc jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
break;
}
}
- V(mutex);
+ V(jcr_chain_mutex);
return jcr;
}
*/
void lock_jcr_chain()
{
- P(mutex);
+ P(jcr_chain_mutex);
}
/*
*/
void unlock_jcr_chain()
{
- V(mutex);
+ V(jcr_chain_mutex);
}
rjcr = jcr->next;
}
if (rjcr) {
+ P(rjcr->mutex);
rjcr->use_count++;
+ V(rjcr->mutex);
Dmsg1(200, "Inc jcr use_count=%d\n", rjcr->use_count);
}
return rjcr;
}
+
+bool init_jcr_subsystem(void)
+{
+ watchdog_t *wd = watchdog_new();
+
+ wd->one_shot = false;
+ wd->interval = 30; /* FIXME: should be configurable somewhere, even
+ if only with a #define */
+ wd->callback = jcr_timeout_check;
+
+ register_watchdog(wd);
+
+ return true;
+}
+
+static void jcr_timeout_check(watchdog_t *self)
+{
+ JCR *jcr;
+ BSOCK *fd;
+ time_t timer_start, now;
+
+ Dmsg0(200, "Start JCR timeout checks\n");
+
+ /* Walk through all JCRs checking if any one is
+ * blocked for more than specified max time.
+ */
+ lock_jcr_chain();
+ for (jcr=NULL; (jcr=get_next_jcr(jcr)); ) {
+ free_locked_jcr(jcr); /* OK to free now cuz chain is locked */
+ if (jcr->JobId == 0) {
+ continue;
+ }
+ fd = jcr->store_bsock;
+ if (fd) {
+ timer_start = fd->timer_start;
+ if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
+ fd->timer_start = 0; /* turn off timer */
+ fd->timed_out = TRUE;
+ Jmsg(jcr, M_ERROR, 0, _(
+"Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
+ watchdog_time - timer_start);
+ pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
+ }
+ }
+ fd = jcr->file_bsock;
+ if (fd) {
+ timer_start = fd->timer_start;
+ if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
+ fd->timer_start = 0; /* turn off timer */
+ fd->timed_out = TRUE;
+ Jmsg(jcr, M_ERROR, 0, _(
+"Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
+ watchdog_time - timer_start);
+ pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
+ }
+ }
+ fd = jcr->dir_bsock;
+ if (fd) {
+ timer_start = fd->timer_start;
+ if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
+ fd->timer_start = 0; /* turn off timer */
+ fd->timed_out = TRUE;
+ Jmsg(jcr, M_ERROR, 0, _(
+"Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
+ watchdog_time - timer_start);
+ pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
+ }
+ }
+
+ }
+ unlock_jcr_chain();
+
+ Dmsg0(200, "Finished JCR timeout checks\n");
+}
+
+/*
+ * Timeout signal comes here
+ */
+void timeout_handler(int sig)
+{
+ return; /* thus interrupting the function */
+}
#include "sha1.h"
#include "tree.h"
#include "watchdog.h"
+#include "timers.h"
#include "bpipe.h"
#include "attr.h"
#include "var.h"
/* watchdog.c */
int start_watchdog(void);
int stop_watchdog(void);
-btimer_t *start_child_timer(pid_t pid, uint32_t wait);
+watchdog_t *watchdog_new(void);
+bool register_watchdog(watchdog_t *wd);
+bool unregister_watchdog(watchdog_t *wd);
+bool unregister_watchdog_unlocked(watchdog_t *wd);
+
+/* timers.c */
+btimer_id start_child_timer(pid_t pid, uint32_t wait);
void stop_child_timer(btimer_id wid);
btimer_id start_thread_timer(pthread_t tid, uint32_t wait);
void stop_thread_timer(btimer_id wid);
#include "bacula.h"
#include "jcr.h"
+/* This breaks Kern's #include rules, but I don't want to put it into bacula.h
+ * until it has been discussed with him */
+#include "bsd_queue.h"
+
/* Exported globals */
time_t watchdog_time; /* this has granularity of SLEEP_TIME */
-#define SLEEP_TIME 30 /* examine things every 30 seconds */
+#define SLEEP_TIME 1 /* examine things every second */
/* Forward referenced functions */
-static void *btimer_thread(void *arg);
-static void stop_btimer(btimer_id wid);
-static btimer_id btimer_start_common(uint32_t wait);
+static void *watchdog_thread(void *arg);
/* Static globals */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t timer = PTHREAD_COND_INITIALIZER;
static int quit;
-static btimer_t *timer_chain = NULL;
+static bool wd_is_init = false;
+/* Forward referenced callback functions */
+static void callback_child_timer(watchdog_t *self);
+static void callback_thread_timer(watchdog_t *self);
+static pthread_t wd_tid;
-/*
- * Timeout signal comes here
- */
-void timeout_handler(int sig)
-{
- return; /* thus interrupting the function */
-}
-
+/* Static globals */
+static TAILQ_HEAD(/* no struct */, s_watchdog_t) wd_queue =
+ TAILQ_HEAD_INITIALIZER(wd_queue);
+static TAILQ_HEAD(/* no struct */, s_watchdog_t) wd_inactive =
+ TAILQ_HEAD_INITIALIZER(wd_inactive);
/*
* Start watchdog thread
int start_watchdog(void)
{
int stat;
- pthread_t wdid;
- struct sigaction sigtimer;
-
- sigtimer.sa_flags = 0;
- sigtimer.sa_handler = timeout_handler;
- sigfillset(&sigtimer.sa_mask);
- sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
+
+ Dmsg0(200, "Initialising NicB-hacked watchdog thread\n");
watchdog_time = time(NULL);
quit = FALSE;
- if ((stat = pthread_create(&wdid, NULL, btimer_thread, (void *)NULL)) != 0) {
+ if ((stat = pthread_create(&wd_tid, NULL, watchdog_thread, NULL)) != 0) {
return stat;
}
+ wd_is_init = true;
return 0;
}
int stop_watchdog(void)
{
int stat;
+ watchdog_t *p, *n;
- quit = TRUE;
- P(mutex);
- if ((stat = pthread_cond_signal(&timer)) != 0) {
- V(mutex);
- return stat;
+ if (!wd_is_init) {
+ Emsg0(M_ABORT, 0, "BUG! stop_watchdog called before start_watchdog\n");
}
- V(mutex);
- return 0;
-}
-
-
-/*
- * This is the actual watchdog thread.
- */
-static void *btimer_thread(void *arg)
-{
- JCR *jcr;
- BSOCK *fd;
- btimer_t *wid;
-
- Dmsg0(200, "Start watchdog thread\n");
- pthread_detach(pthread_self());
- for ( ;!quit; ) {
- time_t timer_start, now;
+ Dmsg0(200, "Sending stop signal to NicB-hacked watchdog thread\n");
+ P(mutex);
+ quit = true;
+ stat = pthread_cond_signal(&timer);
+ V(mutex);
- Dmsg0(200, "Top of watchdog loop\n");
+ wd_is_init = false;
- watchdog_time = time(NULL); /* update timer */
-
- /* Walk through all JCRs checking if any one is
- * blocked for more than specified max time.
- */
- lock_jcr_chain();
- for (jcr=NULL; (jcr=get_next_jcr(jcr)); ) {
- free_locked_jcr(jcr);
- if (jcr->JobId == 0) {
- continue;
- }
- fd = jcr->store_bsock;
- if (fd) {
- timer_start = fd->timer_start;
- if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
- fd->timer_start = 0; /* turn off timer */
- fd->timed_out = TRUE;
- Jmsg(jcr, M_ERROR, 0, _(
-"Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
- watchdog_time - timer_start);
- pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
- }
- }
- fd = jcr->file_bsock;
- if (fd) {
- timer_start = fd->timer_start;
- if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
- fd->timer_start = 0; /* turn off timer */
- fd->timed_out = TRUE;
- Jmsg(jcr, M_ERROR, 0, _(
-"Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
- watchdog_time - timer_start);
- pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
- }
- }
- fd = jcr->dir_bsock;
- if (fd) {
- timer_start = fd->timer_start;
- if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
- fd->timer_start = 0; /* turn off timer */
- fd->timed_out = TRUE;
- Jmsg(jcr, M_ERROR, 0, _(
-"Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
- watchdog_time - timer_start);
- pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
- }
- }
+ stat = pthread_join(wd_tid, NULL);
+ TAILQ_FOREACH_SAFE(p, &wd_queue, qe, n) {
+ TAILQ_REMOVE(&wd_queue, p, qe);
+ if (p->destructor != NULL) {
+ p->destructor(p);
}
- unlock_jcr_chain();
-
- Dmsg0(200, "Watchdog sleep.\n");
- bmicrosleep(SLEEP_TIME, 0);
- now = time(NULL);
+ free(p);
+ }
- /*
- * Now handle child and thread timers set by the code.
- */
- /* Walk child chain killing off any process overdue */
- P(mutex);
- for (wid = timer_chain; wid; wid=wid->next) {
- int killed = FALSE;
- /* First ask him politely to go away */
- if (!wid->killed && now > (wid->start_time + wid->wait)) {
-// Dmsg1(000, "Watchdog sigterm pid=%d\n", wid->pid);
- if (wid->type == TYPE_CHILD) {
- kill(wid->pid, SIGTERM);
- killed = TRUE;
- } else {
- Dmsg1(200, "watchdog kill thread %d\n", wid->tid);
- pthread_kill(wid->tid, TIMEOUT_SIGNAL);
- wid->killed = TRUE;
- }
- }
- /* If we asked a child to die, wait 3 seconds and slam him */
- if (killed) {
- btimer_t *wid1;
- bmicrosleep(3, 0);
- for (wid1 = timer_chain; wid1; wid1=wid1->next) {
- if (wid->type == TYPE_CHILD &&
- !wid1->killed && now > (wid1->start_time + wid1->wait)) {
- kill(wid1->pid, SIGKILL);
-// Dmsg1(000, "Watchdog killed pid=%d\n", wid->pid);
- wid1->killed = TRUE;
- }
- }
- }
+ TAILQ_FOREACH_SAFE(p, &wd_inactive, qe, n) {
+ TAILQ_REMOVE(&wd_inactive, p, qe);
+ if (p->destructor != NULL) {
+ p->destructor(p);
}
- V(mutex);
- } /* end of big for loop */
+ free(p);
+ }
- Dmsg0(200, "End watchdog\n");
- return NULL;
+ return stat;
}
-/*
- * Start a timer on a child process of pid, kill it after wait seconds.
- * NOTE! Granularity is SLEEP_TIME (i.e. 30 seconds)
- *
- * Returns: btimer_id (pointer to btimer_t struct) on success
- * NULL on failure
- */
-btimer_id start_child_timer(pid_t pid, uint32_t wait)
+watchdog_t *watchdog_new(void)
{
- btimer_t *wid;
- wid = btimer_start_common(wait);
- wid->pid = pid;
- wid->type = TYPE_CHILD;
- Dmsg2(200, "Start child timer 0x%x for %d secs.\n", wid, wait);
- return wid;
-}
+ watchdog_t *wd = (watchdog_t *) malloc(sizeof(watchdog_t));
-/*
- * Start a timer on a thread. kill it after wait seconds.
- * NOTE! Granularity is SLEEP_TIME (i.e. 30 seconds)
- *
- * Returns: btimer_id (pointer to btimer_t struct) on success
- * NULL on failure
- */
-btimer_id start_thread_timer(pthread_t tid, uint32_t wait)
-{
- btimer_t *wid;
- wid = btimer_start_common(wait);
- wid->tid = tid;
- wid->type = TYPE_PTHREAD;
- Dmsg2(200, "Start thread timer 0x%x for %d secs.\n", wid, wait);
- return wid;
+ if (!wd_is_init) {
+ Emsg0(M_ABORT, 0, "BUG! watchdog_new called before start_watchdog\n");
+ }
+
+ if (wd == NULL) {
+ return NULL;
+ }
+ wd->one_shot = true;
+ wd->interval = 0;
+ wd->callback = NULL;
+ wd->destructor = NULL;
+ wd->data = NULL;
+
+ return wd;
}
-static btimer_id btimer_start_common(uint32_t wait)
+bool register_watchdog(watchdog_t *wd)
{
- btimer_id wid = (btimer_id)malloc(sizeof(btimer_t));
+ if (!wd_is_init) {
+ Emsg0(M_ABORT, 0, "BUG! register_watchdog called before start_watchdog\n");
+ }
+ if (wd->callback == NULL) {
+ Emsg1(M_ABORT, 0, "BUG! Watchdog %p has NULL callback\n", wd);
+ }
+ if (wd->interval == 0) {
+ Emsg1(M_ABORT, 0, "BUG! Watchdog %p has zero interval\n", wd);
+ }
P(mutex);
- /* Chain it into timer_chain as the first item */
- wid->prev = NULL;
- wid->next = timer_chain;
- if (timer_chain) {
- timer_chain->prev = wid;
- }
- timer_chain = wid;
- wid->start_time = time(NULL);
- wid->wait = wait;
- wid->killed = FALSE;
+ wd->next_fire = watchdog_time + wd->interval;
+ TAILQ_INSERT_TAIL(&wd_queue, wd, qe);
+ Dmsg3(200, "Registered watchdog %p, interval %d%s\n",
+ wd, wd->interval, wd->one_shot ? " one shot" : "");
V(mutex);
- return wid;
-}
-/*
- * Stop child timer
- */
-void stop_child_timer(btimer_id wid)
-{
- Dmsg2(200, "Stop child timer 0x%x for %d secs.\n", wid, wid->wait);
- stop_btimer(wid);
+ return false;
}
-/*
- * Stop thread timer
- */
-void stop_thread_timer(btimer_id wid)
+bool unregister_watchdog_unlocked(watchdog_t *wd)
{
- if (!wid) {
- return;
+ watchdog_t *p, *n;
+
+ if (!wd_is_init) {
+ Emsg0(M_ABORT, 0, "BUG! unregister_watchdog_unlocked called before start_watchdog\n");
}
- Dmsg2(200, "Stop thread timer 0x%x for %d secs.\n", wid, wid->wait);
- stop_btimer(wid);
-}
+ TAILQ_FOREACH_SAFE(p, &wd_queue, qe, n) {
+ if (wd == p) {
+ TAILQ_REMOVE(&wd_queue, wd, qe);
+ Dmsg1(200, "Unregistered watchdog %p\n", wd);
+ return true;
+ }
+ }
-/*
- * Stop btimer
- */
-static void stop_btimer(btimer_id wid)
+ TAILQ_FOREACH_SAFE(p, &wd_inactive, qe, n) {
+ if (wd == p) {
+ TAILQ_REMOVE(&wd_inactive, wd, qe);
+ Dmsg1(200, "Unregistered inactive watchdog %p\n", wd);
+ return true;
+ }
+ }
+
+ Dmsg1(200, "Failed to unregister watchdog %p\n", wd);
+
+ return false;
+}
+
+bool unregister_watchdog(watchdog_t *wd)
{
- if (wid == NULL) {
- Emsg0(M_ABORT, 0, _("NULL btimer_id.\n"));
+ bool ret;
+
+ if (!wd_is_init) {
+ Emsg0(M_ABORT, 0, "BUG! unregister_watchdog called before start_watchdog\n");
}
+
P(mutex);
- /* Remove wid from timer_chain */
- if (!wid->prev) { /* if no prev */
- timer_chain = wid->next; /* set new head */
- } else {
- wid->prev->next = wid->next; /* update prev */
- }
- if (wid->next) {
- wid->next->prev = wid->prev; /* unlink it */
- }
+ ret = unregister_watchdog_unlocked(wd);
V(mutex);
- free(wid);
+
+ return ret;
+}
+
+static void *watchdog_thread(void *arg)
+{
+ Dmsg0(200, "NicB-reworked watchdog thread entered\n");
+
+ while (true) {
+ watchdog_t *p, *n;
+
+ P(mutex);
+ if (quit) {
+ V(mutex);
+ break;
+ }
+
+ watchdog_time = time(NULL);
+
+ TAILQ_FOREACH_SAFE(p, &wd_queue, qe, n) {
+ if (p->next_fire < watchdog_time) {
+ /* Run the callback */
+ p->callback(p);
+
+ /* Reschedule (or move to inactive list if it's a one-shot timer) */
+ if (p->one_shot) {
+ TAILQ_REMOVE(&wd_queue, p, qe);
+ TAILQ_INSERT_TAIL(&wd_inactive, p, qe);
+ } else {
+ p->next_fire = watchdog_time + p->interval;
+ }
+ }
+ }
+ V(mutex);
+ bmicrosleep(SLEEP_TIME, 0);
+ }
+
+ Dmsg0(200, "NicB-reworked watchdog thread exited\n");
+
+ return NULL;
}
#define TIMEOUT_SIGNAL SIGUSR2
-typedef struct s_btimer_t {
- struct s_btimer_t *next;
- struct s_btimer_t *prev;
- time_t start_time;
- int32_t wait;
- pid_t pid; /* process id if TYPE_CHILD */
- int killed;
- int type;
- pthread_t tid; /* thread id if TYPE_PTHREAD */
-} btimer_t;
-
-#define btimer_id btimer_t *
+/* This breaks Kern's #include rules, but I don't want to put it into bacula.h
+ * until it has been discussed with him */
+#include "bsd_queue.h"
+
+struct s_watchdog_t {
+ bool one_shot;
+ time_t interval;
+ void (*callback)(struct s_watchdog_t *wd);
+ void (*destructor)(struct s_watchdog_t *wd);
+ void *data;
+ /* Private data below - don't touch outside of watchdog.c */
+ TAILQ_ENTRY(s_watchdog_t) qe;
+ time_t next_fire;
+};
+typedef struct s_watchdog_t watchdog_t;
start_watchdog(); /* start watchdog thread */
+ init_jcr_subsystem(); /* start JCR watchdogs etc. */
+
/*
* Sleep a bit to give device thread a chance to lock the resource
* chain before we start the server.
}
delete_pid_file(me->pid_directory, "bacula-sd", me->SDport);
- stop_watchdog();
Dmsg1(200, "In terminate_stored() sig=%d\n", sig);
print_memory_pool_stats();
}
term_msg();
+ stop_watchdog();
close_memory_pool();
sm_dump(False); /* dump orphaned buffers */
#undef VERSION
#define VERSION "1.33"
#define VSTRING "1"
-#define BDATE "20 Dec 2003"
-#define LSMDATE "20Dec03"
+#define BDATE "22 Dec 2003"
+#define LSMDATE "22Dec03"
/* Debug flags */
#undef DEBUG