2 * Manipulation routines for Job Control Records and
3 * handling of last_jobs_list.
5 * Kern E. Sibbald, December 2000
9 * These routines are thread safe.
11 * The job list routines were re-written in May 2005 to
12 * eliminate the global lock while traversing the list, and
13 * to use the dlist subroutines. The locking is now done
14 * on the list each time the list is modified or traversed.
15 * That is it is "micro-locked" rather than globally locked.
16 * The result is that there is one lock/unlock for each entry
17 * in the list while traversing it rather than a single lock
18 * at the beginning of a traversal and one at the end. This
19 * incurs slightly more overhead, but effectively eliminates
20 * the possibilty of race conditions. In addition, with the
21 * exception of the global locking of the list during the
22 * re-reading of the config file, no recursion is needed.
26 Copyright (C) 2000-2005 Kern Sibbald
28 This program is free software; you can redistribute it and/or
29 modify it under the terms of the GNU General Public License
30 version 2 as ammended with additional clauses defined in the
31 file LICENSE in the main source directory.
33 This program is distributed in the hope that it will be useful,
34 but WITHOUT ANY WARRANTY; without even the implied warranty of
35 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
36 the file LICENSE for additional details.
43 /* External variables we reference */
44 extern time_t watchdog_time;
46 /* Forward referenced functions */
47 extern "C" void timeout_handler(int sig);
48 static void jcr_timeout_check(watchdog_t *self);
49 #ifdef TRACE_JCR_CHAIN
50 static void b_lock_jcr_chain(const char *filen, int line);
51 static void b_unlock_jcr_chain(const char *filen, int line);
52 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
53 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
55 static void lock_jcr_chain();
56 static void unlock_jcr_chain();
61 dlist *last_jobs = NULL;
62 const int max_last_jobs = 10;
64 static dlist *jcrs = NULL; /* JCR chain */
65 //static brwlock_t lock; /* lock for last jobs and JCR chain */
66 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
68 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
80 void init_last_jobs_list()
84 struct s_last_job *job_entry = NULL;
86 last_jobs = New(dlist(job_entry, &job_entry->link));
87 // if ((errstat=rwl_init(&lock)) != 0) {
88 // Emsg1(M_ABORT, 0, _("Unable to initialize jcr_chain lock. ERR=%s\n"),
89 // strerror(errstat));
93 jcrs = New(dlist(jcr, &jcr->link));
97 void term_last_jobs_list()
100 while (!last_jobs->empty()) {
101 void *je = last_jobs->first();
102 last_jobs->remove(je);
107 // rwl_destroy(&lock);
112 void read_last_jobs_list(int fd, uint64_t addr)
114 struct s_last_job *je, job;
117 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
118 if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
121 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
124 Dmsg1(100, "Read num_items=%d\n", num);
125 if (num > 4 * max_last_jobs) { /* sanity check */
128 for ( ; num; num--) {
129 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
130 Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
134 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
135 memcpy((char *)je, (char *)&job, sizeof(job));
137 init_last_jobs_list();
139 last_jobs->append(je);
140 if (last_jobs->size() > max_last_jobs) {
141 je = (struct s_last_job *)last_jobs->first();
142 last_jobs->remove(je);
149 uint64_t write_last_jobs_list(int fd, uint64_t addr)
151 struct s_last_job *je;
154 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
155 if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
159 /* First record is number of entires */
160 num = last_jobs->size();
161 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
162 Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
165 foreach_dlist(je, last_jobs) {
166 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
167 Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
172 /* Return current address */
173 ssize_t stat = lseek(fd, 0, SEEK_CUR);
181 void lock_last_jobs_list()
183 /* Use jcr chain mutex */
187 void unlock_last_jobs_list()
189 /* Use jcr chain mutex */
194 * Push a subroutine address into the job end callback stack
196 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
198 jcr->job_end_push.append((void *)job_end_cb);
199 jcr->job_end_push.append(ctx);
202 /* Pop each job_end subroutine and call it */
203 static void job_end_pop(JCR *jcr)
205 void (*job_end_cb)(JCR *jcr, void *ctx);
207 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
208 ctx = jcr->job_end_push.get(i--);
209 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
210 job_end_cb(jcr, ctx);
215 * Create a Job Control Record and link it into JCR chain
216 * Returns newly allocated JCR
217 * Note, since each daemon has a different JCR, he passes
220 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
223 MQUEUE_ITEM *item = NULL;
224 struct sigaction sigtimer;
226 Dmsg0(3400, "Enter new_jcr\n");
227 jcr = (JCR *)malloc(size);
228 memset(jcr, 0, size);
229 jcr->my_thread_id = pthread_self();
230 jcr->msg_queue = New(dlist(item, &item->link));
231 jcr->job_end_push.init(1, false);
232 jcr->sched_time = time(NULL);
233 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
235 pthread_mutex_init(&(jcr->mutex), NULL);
236 jcr->JobStatus = JS_Created; /* ready to run */
237 jcr->VolumeName = get_pool_memory(PM_FNAME);
238 jcr->VolumeName[0] = 0;
239 jcr->errmsg = get_pool_memory(PM_MESSAGE);
241 /* Setup some dummy values */
242 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
244 jcr->JobType = JT_SYSTEM; /* internal job until defined */
245 jcr->JobLevel = L_NONE;
246 jcr->JobStatus = JS_Created;
248 sigtimer.sa_flags = 0;
249 sigtimer.sa_handler = timeout_handler;
250 sigfillset(&sigtimer.sa_mask);
251 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
254 * Locking jobs is a global lock that is needed
255 * so that the Director can stop new jobs from being
256 * added to the jcr chain while it processes a new
257 * conf file and does the job_end_push().
262 jcrs = New(dlist(jcr, &jcr->link));
273 * Remove a JCR from the chain
274 * NOTE! The chain must be locked prior to calling
277 static void remove_jcr(JCR *jcr)
279 Dmsg0(3400, "Enter remove_jcr\n");
281 Emsg0(M_ABORT, 0, "NULL jcr.\n");
284 Dmsg0(3400, "Leave remove_jcr\n");
288 * Free stuff common to all JCRs. N.B. Be careful to include only
289 * generic stuff in the common part of the jcr.
291 static void free_common_jcr(JCR *jcr)
293 struct s_last_job *je, last_job;
295 /* Keep some statistics */
296 switch (jcr->JobType) {
302 last_job.Errors = jcr->Errors;
303 last_job.JobType = jcr->JobType;
304 last_job.JobId = jcr->JobId;
305 last_job.VolSessionId = jcr->VolSessionId;
306 last_job.VolSessionTime = jcr->VolSessionTime;
307 bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
308 last_job.JobFiles = jcr->JobFiles;
309 last_job.JobBytes = jcr->JobBytes;
310 last_job.JobStatus = jcr->JobStatus;
311 last_job.JobLevel = jcr->JobLevel;
312 last_job.start_time = jcr->start_time;
313 last_job.end_time = time(NULL);
314 /* Keep list of last jobs, but not Console where JobId==0 */
315 if (last_job.JobId > 0) {
316 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
317 memcpy((char *)je, (char *)&last_job, sizeof(last_job));
319 init_last_jobs_list();
321 last_jobs->append(je);
322 if (last_jobs->size() > max_last_jobs) {
323 je = (struct s_last_job *)last_jobs->first();
324 last_jobs->remove(je);
332 pthread_mutex_destroy(&jcr->mutex);
334 delete jcr->msg_queue;
335 close_msg(jcr); /* close messages for this job */
337 /* do this after closing messages */
338 if (jcr->client_name) {
339 free_pool_memory(jcr->client_name);
340 jcr->client_name = NULL;
344 free_pool_memory(jcr->attr);
348 if (jcr->sd_auth_key) {
349 free(jcr->sd_auth_key);
350 jcr->sd_auth_key = NULL;
352 if (jcr->VolumeName) {
353 free_pool_memory(jcr->VolumeName);
354 jcr->VolumeName = NULL;
357 if (jcr->dir_bsock) {
358 bnet_close(jcr->dir_bsock);
359 jcr->dir_bsock = NULL;
362 free_pool_memory(jcr->errmsg);
369 if (jcr->cached_path) {
370 free_pool_memory(jcr->cached_path);
371 jcr->cached_path = NULL;
374 free_getuser_cache();
375 free_getgroup_cache();
380 * Global routine to free a jcr
383 void b_free_jcr(const char *file, int line, JCR *jcr)
385 Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
389 void free_jcr(JCR *jcr)
392 Dmsg1(3400, "Enter free_jcr 0x%x\n", jcr);
396 dequeue_messages(jcr);
398 jcr->dec_use_count(); /* decrement use count */
399 if (jcr->use_count < 0) {
400 Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
401 jcr->use_count, jcr->JobId);
403 Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count, jcr->JobId);
404 if (jcr->use_count > 0) { /* if in use */
406 Dmsg2(3400, "free_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
410 remove_jcr(jcr); /* remove Jcr from chain */
413 job_end_pop(jcr); /* pop and call hooked routines */
415 Dmsg1(3400, "End job=%d\n", jcr->JobId);
416 if (jcr->daemon_free_jcr) {
417 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
419 free_common_jcr(jcr);
420 close_msg(NULL); /* flush any daemon messages */
421 Dmsg0(3400, "Exit free_jcr\n");
426 * Given a JobId, find the JCR
427 * Returns: jcr on success
430 JCR *get_jcr_by_id(uint32_t JobId)
434 lock_jcr_chain(); /* lock chain */
435 foreach_dlist(jcr, jcrs) {
436 if (jcr->JobId == JobId) {
437 jcr->inc_use_count();
438 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
447 * Given a SessionId and SessionTime, find the JCR
448 * Returns: jcr on success
451 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
456 foreach_dlist(jcr, jcrs) {
457 if (jcr->VolSessionId == SessionId &&
458 jcr->VolSessionTime == SessionTime) {
459 jcr->inc_use_count();
460 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
470 * Given a Job, find the JCR
471 * compares on the number of characters in Job
472 * thus allowing partial matches.
473 * Returns: jcr on success
476 JCR *get_jcr_by_partial_name(char *Job)
486 foreach_dlist(jcr, jcrs) {
487 if (strncmp(Job, jcr->Job, len) == 0) {
488 jcr->inc_use_count();
489 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
499 * Given a Job, find the JCR
500 * requires an exact match of names.
501 * Returns: jcr on success
504 JCR *get_jcr_by_full_name(char *Job)
512 foreach_dlist(jcr, jcrs) {
513 if (strcmp(jcr->Job, Job) == 0) {
514 jcr->inc_use_count();
515 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
523 void set_jcr_job_status(JCR *jcr, int JobStatus)
526 * For a set of errors, ... keep the current status
527 * so it isn't lost. For all others, set it.
529 switch (jcr->JobStatus) {
530 case JS_ErrorTerminated:
537 jcr->JobStatus = JobStatus;
541 #ifdef TRACE_JCR_CHAIN
542 static int lock_count = 0;
548 #ifdef TRACE_JCR_CHAIN
549 static void b_lock_jcr_chain(const char *fname, int line)
551 static void lock_jcr_chain()
555 #ifdef TRACE_JCR_CHAIN
556 Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
559 // if ((errstat=rwl_writelock(&lock)) != 0) {
560 // Emsg1(M_ABORT, 0, "rwl_writelock failure. ERR=%s\n",
561 // strerror(errstat));
569 #ifdef TRACE_JCR_CHAIN
570 static void b_unlock_jcr_chain(const char *fname, int line)
572 static void unlock_jcr_chain()
576 #ifdef TRACE_JCR_CHAIN
577 Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
580 // if ((errstat=rwl_writeunlock(&lock)) != 0) {
581 // Emsg1(M_ABORT, 0, "rwl_writeunlock failure. ERR=%s\n",
582 // strerror(errstat));
588 JCR *get_next_jcr(JCR *prev_jcr)
593 jcr = (JCR *)jcrs->next(prev_jcr);
595 jcr->inc_use_count();
596 Dmsg2(3400, "Inc get_next_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
602 bool init_jcr_subsystem(void)
604 watchdog_t *wd = new_watchdog();
606 wd->one_shot = false;
607 wd->interval = 30; /* FIXME: should be configurable somewhere, even
608 if only with a #define */
609 wd->callback = jcr_timeout_check;
611 register_watchdog(wd);
616 static void jcr_timeout_check(watchdog_t *self)
622 Dmsg0(3400, "Start JCR timeout checks\n");
624 /* Walk through all JCRs checking if any one is
625 * blocked for more than specified max time.
628 if (jcr->JobId == 0) {
632 fd = jcr->store_bsock;
634 timer_start = fd->timer_start;
635 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
636 fd->timer_start = 0; /* turn off timer */
637 fd->timed_out = TRUE;
638 Jmsg(jcr, M_ERROR, 0, _(
639 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
640 watchdog_time - timer_start);
641 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
644 fd = jcr->file_bsock;
646 timer_start = fd->timer_start;
647 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
648 fd->timer_start = 0; /* turn off timer */
649 fd->timed_out = TRUE;
650 Jmsg(jcr, M_ERROR, 0, _(
651 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
652 watchdog_time - timer_start);
653 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
658 timer_start = fd->timer_start;
659 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
660 fd->timer_start = 0; /* turn off timer */
661 fd->timed_out = TRUE;
662 Jmsg(jcr, M_ERROR, 0, _(
663 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
664 watchdog_time - timer_start);
665 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
671 Dmsg0(3400, "Finished JCR timeout checks\n");
675 * Timeout signal comes here
677 extern "C" void timeout_handler(int sig)
679 return; /* thus interrupting the function */