2 * Manipulation routines for Job Control Records and
3 * handling of last_jobs_list.
5 * Kern E. Sibbald, December 2000
9 * These routines are thread safe.
11 * The job list routines were re-written in May 2005 to
12 * eliminate the global lock while traversing the list, and
13 * to use the dlist subroutines. The locking is now done
14 * on the list each time the list is modified or traversed.
15 * That is it is "micro-locked" rather than globally locked.
16 * The result is that there is one lock/unlock for each entry
17 * in the list while traversing it rather than a single lock
18 * at the beginning of a traversal and one at the end. This
19 * incurs slightly more overhead, but effectively eliminates
20 * the possibilty of race conditions. In addition, with the
21 * exception of the global locking of the list during the
22 * re-reading of the config file, no recursion is needed.
26 Copyright (C) 2000-2006 Kern Sibbald
28 This program is free software; you can redistribute it and/or
29 modify it under the terms of the GNU General Public License
30 version 2 as amended with additional clauses defined in the
31 file LICENSE in the main source directory.
33 This program is distributed in the hope that it will be useful,
34 but WITHOUT ANY WARRANTY; without even the implied warranty of
35 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
36 the file LICENSE for additional details.
43 /* External variables we reference */
44 extern time_t watchdog_time;
46 /* Forward referenced functions */
47 extern "C" void timeout_handler(int sig);
48 static void jcr_timeout_check(watchdog_t *self);
49 #ifdef TRACE_JCR_CHAIN
50 static void b_lock_jcr_chain(const char *filen, int line);
51 static void b_unlock_jcr_chain(const char *filen, int line);
52 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
53 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
55 static void lock_jcr_chain();
56 static void unlock_jcr_chain();
61 dlist *last_jobs = NULL;
62 const int max_last_jobs = 10;
64 static dlist *jcrs = NULL; /* JCR chain */
65 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
67 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
79 void init_last_jobs_list()
82 struct s_last_job *job_entry = NULL;
84 last_jobs = New(dlist(job_entry, &job_entry->link));
87 jcrs = New(dlist(jcr, &jcr->link));
91 void term_last_jobs_list()
94 while (!last_jobs->empty()) {
95 void *je = last_jobs->first();
96 last_jobs->remove(je);
108 bool read_last_jobs_list(int fd, uint64_t addr)
110 struct s_last_job *je, job;
113 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
114 if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
117 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
120 Dmsg1(100, "Read num_items=%d\n", num);
121 if (num > 4 * max_last_jobs) { /* sanity check */
124 for ( ; num; num--) {
125 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
126 Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
130 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
131 memcpy((char *)je, (char *)&job, sizeof(job));
133 init_last_jobs_list();
135 last_jobs->append(je);
136 if (last_jobs->size() > max_last_jobs) {
137 je = (struct s_last_job *)last_jobs->first();
138 last_jobs->remove(je);
146 uint64_t write_last_jobs_list(int fd, uint64_t addr)
148 struct s_last_job *je;
151 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
152 if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
156 /* First record is number of entires */
157 num = last_jobs->size();
158 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
159 Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
162 foreach_dlist(je, last_jobs) {
163 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
164 Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
169 /* Return current address */
170 ssize_t stat = lseek(fd, 0, SEEK_CUR);
178 void lock_last_jobs_list()
180 /* Use jcr chain mutex */
184 void unlock_last_jobs_list()
186 /* Use jcr chain mutex */
191 * Push a subroutine address into the job end callback stack
193 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
195 jcr->job_end_push.append((void *)job_end_cb);
196 jcr->job_end_push.append(ctx);
199 /* Pop each job_end subroutine and call it */
200 static void job_end_pop(JCR *jcr)
202 void (*job_end_cb)(JCR *jcr, void *ctx);
204 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
205 ctx = jcr->job_end_push.get(i--);
206 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
207 job_end_cb(jcr, ctx);
212 * Create a Job Control Record and link it into JCR chain
213 * Returns newly allocated JCR
214 * Note, since each daemon has a different JCR, he passes
217 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
220 MQUEUE_ITEM *item = NULL;
221 struct sigaction sigtimer;
223 Dmsg0(3400, "Enter new_jcr\n");
224 jcr = (JCR *)malloc(size);
225 memset(jcr, 0, size);
226 jcr->my_thread_id = pthread_self();
227 jcr->msg_queue = New(dlist(item, &item->link));
228 jcr->job_end_push.init(1, false);
229 jcr->sched_time = time(NULL);
230 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
232 pthread_mutex_init(&(jcr->mutex), NULL);
233 jcr->JobStatus = JS_Created; /* ready to run */
234 jcr->VolumeName = get_pool_memory(PM_FNAME);
235 jcr->VolumeName[0] = 0;
236 jcr->errmsg = get_pool_memory(PM_MESSAGE);
238 /* Setup some dummy values */
239 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
241 jcr->JobType = JT_SYSTEM; /* internal job until defined */
242 jcr->JobLevel = L_NONE;
243 jcr->JobStatus = JS_Created;
245 sigtimer.sa_flags = 0;
246 sigtimer.sa_handler = timeout_handler;
247 sigfillset(&sigtimer.sa_mask);
248 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
251 * Locking jobs is a global lock that is needed
252 * so that the Director can stop new jobs from being
253 * added to the jcr chain while it processes a new
254 * conf file and does the job_end_push().
259 jcrs = New(dlist(jcr, &jcr->link));
270 * Remove a JCR from the chain
271 * NOTE! The chain must be locked prior to calling
274 static void remove_jcr(JCR *jcr)
276 Dmsg0(3400, "Enter remove_jcr\n");
278 Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
281 Dmsg0(3400, "Leave remove_jcr\n");
285 * Free stuff common to all JCRs. N.B. Be careful to include only
286 * generic stuff in the common part of the jcr.
288 static void free_common_jcr(JCR *jcr)
290 struct s_last_job *je, last_job;
292 /* Keep some statistics */
293 switch (jcr->JobType) {
301 last_job.Errors = jcr->Errors;
302 last_job.JobType = jcr->JobType;
303 last_job.JobId = jcr->JobId;
304 last_job.VolSessionId = jcr->VolSessionId;
305 last_job.VolSessionTime = jcr->VolSessionTime;
306 bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
307 last_job.JobFiles = jcr->JobFiles;
308 last_job.JobBytes = jcr->JobBytes;
309 last_job.JobStatus = jcr->JobStatus;
310 last_job.JobLevel = jcr->JobLevel;
311 last_job.start_time = jcr->start_time;
312 last_job.end_time = time(NULL);
313 /* Keep list of last jobs, but not Console where JobId==0 */
314 if (last_job.JobId > 0) {
315 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
316 memcpy((char *)je, (char *)&last_job, sizeof(last_job));
318 init_last_jobs_list();
320 last_jobs->append(je);
321 if (last_jobs->size() > max_last_jobs) {
322 je = (struct s_last_job *)last_jobs->first();
323 last_jobs->remove(je);
331 pthread_mutex_destroy(&jcr->mutex);
333 delete jcr->msg_queue;
334 close_msg(jcr); /* close messages for this job */
336 /* do this after closing messages */
337 if (jcr->client_name) {
338 free_pool_memory(jcr->client_name);
339 jcr->client_name = NULL;
343 free_pool_memory(jcr->attr);
347 if (jcr->sd_auth_key) {
348 free(jcr->sd_auth_key);
349 jcr->sd_auth_key = NULL;
351 if (jcr->VolumeName) {
352 free_pool_memory(jcr->VolumeName);
353 jcr->VolumeName = NULL;
356 if (jcr->dir_bsock) {
357 bnet_close(jcr->dir_bsock);
358 jcr->dir_bsock = NULL;
361 free_pool_memory(jcr->errmsg);
368 if (jcr->cached_path) {
369 free_pool_memory(jcr->cached_path);
370 jcr->cached_path = NULL;
373 free_getuser_cache();
374 free_getgroup_cache();
379 * Global routine to free a jcr
382 void b_free_jcr(const char *file, int line, JCR *jcr)
384 Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
388 void free_jcr(JCR *jcr)
391 Dmsg2(3400, "Enter free_jcr 0x%x job=%d\n", jcr, jcr->JobId);
395 dequeue_messages(jcr);
397 jcr->dec_use_count(); /* decrement use count */
398 if (jcr->use_count < 0) {
399 Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
400 jcr->use_count, jcr->JobId);
402 Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count, jcr->JobId);
403 if (jcr->use_count > 0) { /* if in use */
405 Dmsg3(3400, "free_jcr 0x%x job=%d use_count=%d\n", jcr, jcr->JobId, jcr->use_count);
409 remove_jcr(jcr); /* remove Jcr from chain */
412 job_end_pop(jcr); /* pop and call hooked routines */
414 Dmsg1(3400, "End job=%d\n", jcr->JobId);
415 if (jcr->daemon_free_jcr) {
416 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
418 free_common_jcr(jcr);
419 close_msg(NULL); /* flush any daemon messages */
420 garbage_collect_memory_pool();
421 Dmsg0(3400, "Exit free_jcr\n");
426 * Given a JobId, find the JCR
427 * Returns: jcr on success
430 JCR *get_jcr_by_id(uint32_t JobId)
434 lock_jcr_chain(); /* lock chain */
435 foreach_dlist(jcr, jcrs) {
436 if (jcr->JobId == JobId) {
437 jcr->inc_use_count();
438 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
447 * Given a SessionId and SessionTime, find the JCR
448 * Returns: jcr on success
451 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
456 foreach_dlist(jcr, jcrs) {
457 if (jcr->VolSessionId == SessionId &&
458 jcr->VolSessionTime == SessionTime) {
459 jcr->inc_use_count();
460 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
470 * Given a Job, find the JCR
471 * compares on the number of characters in Job
472 * thus allowing partial matches.
473 * Returns: jcr on success
476 JCR *get_jcr_by_partial_name(char *Job)
486 foreach_dlist(jcr, jcrs) {
487 if (strncmp(Job, jcr->Job, len) == 0) {
488 jcr->inc_use_count();
489 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
499 * Given a Job, find the JCR
500 * requires an exact match of names.
501 * Returns: jcr on success
504 JCR *get_jcr_by_full_name(char *Job)
512 foreach_dlist(jcr, jcrs) {
513 if (strcmp(jcr->Job, Job) == 0) {
514 jcr->inc_use_count();
515 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
523 void set_jcr_job_status(JCR *jcr, int JobStatus)
526 * For a set of errors, ... keep the current status
527 * so it isn't lost. For all others, set it.
529 switch (jcr->JobStatus) {
530 case JS_ErrorTerminated:
537 jcr->JobStatus = JobStatus;
541 #ifdef TRACE_JCR_CHAIN
542 static int lock_count = 0;
548 #ifdef TRACE_JCR_CHAIN
549 static void b_lock_jcr_chain(const char *fname, int line)
551 static void lock_jcr_chain()
554 #ifdef TRACE_JCR_CHAIN
555 Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
564 #ifdef TRACE_JCR_CHAIN
565 static void b_unlock_jcr_chain(const char *fname, int line)
567 static void unlock_jcr_chain()
570 #ifdef TRACE_JCR_CHAIN
571 Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
579 * Start walk of jcr chain
580 * The proper way to walk the jcr chain is:
587 * It is possible to leave out the endeach_jcr(jcr), but
588 * in that case, the last jcr referenced must be explicitly
594 JCR *jcr_walk_start()
598 jcr = (JCR *)jcrs->first();
600 jcr->inc_use_count();
601 Dmsg3(3400, "Inc jcr_walk_start 0x%x job=%d use_count=%d\n", jcr,
602 jcr->JobId, jcr->use_count);
609 * Get next jcr from chain, and release current one
611 JCR *jcr_walk_next(JCR *prev_jcr)
616 jcr = (JCR *)jcrs->next(prev_jcr);
618 jcr->inc_use_count();
619 Dmsg3(3400, "Inc jcr_walk_next 0x%x job=%d use_count=%d\n", jcr,
620 jcr->JobId, jcr->use_count);
630 * Release last jcr referenced
632 void jcr_walk_end(JCR *jcr)
641 * Setup to call the timeout check routine every 30 seconds
642 * This routine will check any timers that have been enabled.
644 bool init_jcr_subsystem(void)
646 watchdog_t *wd = new_watchdog();
648 wd->one_shot = false;
649 wd->interval = 30; /* FIXME: should be configurable somewhere, even
650 if only with a #define */
651 wd->callback = jcr_timeout_check;
653 register_watchdog(wd);
658 static void jcr_timeout_check(watchdog_t *self)
664 Dmsg0(3400, "Start JCR timeout checks\n");
666 /* Walk through all JCRs checking if any one is
667 * blocked for more than specified max time.
670 if (jcr->JobId == 0) {
673 fd = jcr->store_bsock;
675 timer_start = fd->timer_start;
676 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
677 fd->timer_start = 0; /* turn off timer */
678 fd->timed_out = true;
679 Jmsg(jcr, M_ERROR, 0, _(
680 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
681 watchdog_time - timer_start);
682 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
685 fd = jcr->file_bsock;
687 timer_start = fd->timer_start;
688 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
689 fd->timer_start = 0; /* turn off timer */
690 fd->timed_out = true;
691 Jmsg(jcr, M_ERROR, 0, _(
692 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
693 watchdog_time - timer_start);
694 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
699 timer_start = fd->timer_start;
700 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
701 fd->timer_start = 0; /* turn off timer */
702 fd->timed_out = true;
703 Jmsg(jcr, M_ERROR, 0, _(
704 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
705 watchdog_time - timer_start);
706 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
712 Dmsg0(3400, "Finished JCR timeout checks\n");
716 * Timeout signal comes here
718 extern "C" void timeout_handler(int sig)
720 return; /* thus interrupting the function */